From bb5e80bcffd59b1e6713cfaf7f2233362bb9873a Mon Sep 17 00:00:00 2001 From: Karun Veluru Date: Fri, 31 Oct 2025 19:30:42 -0500 Subject: [PATCH 1/2] feature(examples): Add Planday connector --- README.md | 1 + connectors/planday/README.md | 97 +++++ connectors/planday/configuration.json | 10 + connectors/planday/connector.py | 573 ++++++++++++++++++++++++++ 4 files changed, 681 insertions(+) create mode 100644 connectors/planday/README.md create mode 100644 connectors/planday/configuration.json create mode 100644 connectors/planday/connector.py diff --git a/README.md b/README.md index 86bade28c..edfd44294 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,7 @@ These connectors are ready to use out of the box, requiring minimal modification - [odata_version_4](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/odata_api/odata_version_4) - This is an example of how to sync data from an OData API version 4 using Connector SDK. - [odata_version_4_using_python_odata](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/odata_api/odata_version_4_using_python_odata) - This is an example of how to sync data from an OData API version 4 using python-odata python library. - [pindrop](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/pindrop) - This is an example of how to sync nightly report data from Pindrop using Connector SDK. You need to provide your Pindrop client ID and client Secret for this example to work. +- [planday](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/planday) - This example shows how to sync employee, department, and skills data from Planday HR API by using Connector SDK. You need to provide your Planday HR API key for this example to work. - [rabbitmq](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/rabbitmq) - This example shows how to sync messages from RabbitMQ queues using Connector SDK. It uses the `pika` library to connect to RabbitMQ and fetch messages from specified queues. You need to provide your RabbitMQ connection URL for this example to work. - Redshift - [simple_redshift_connector](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/redshift/simple_redshift_connector) - This example shows how to sync records from Redshift by using Connector SDK. You need to provide your Redshift credentials for this example to work. diff --git a/connectors/planday/README.md b/connectors/planday/README.md new file mode 100644 index 000000000..134bbdc0d --- /dev/null +++ b/connectors/planday/README.md @@ -0,0 +1,97 @@ +# Planday HR API Connector Example + +## Connector overview +This connector syncs employee, department, and skills data from the Planday HR API to your destination warehouse. It demonstrates memory-efficient streaming patterns for large datasets, implements comprehensive error handling with exponential backoff retry logic, and supports both initial and incremental synchronization using timestamp-based cursors. The connector fetches data from three core endpoints: `/employees` for staff information, `/departments` for organizational structure, and `/skills` for competency tracking. + +## Requirements +- [Supported Python versions](https://github.com/fivetran/fivetran_connector_sdk/blob/main/README.md#requirements): **3.9-3.13** +- Operating system: + - Windows: 10 or later (64-bit only) + - macOS: 13 (Ventura) or later (Apple Silicon [arm64] or Intel [x86_64]) + - Linux: Distributions such as Ubuntu 20.04 or later, Debian 10 or later, or Amazon Linux 2 or later (arm64 or x86_64) + +## Getting started +Refer to the [Connector SDK Setup Guide](https://fivetran.com/docs/connectors/connector-sdk/setup-guide) to get started. + +## Features +- Syncs employee personal information, job details, and compensation data from Planday HR API +- Bearer token authentication with automatic retry logic for expired tokens (refer to `execute_api_request` function) +- Offset-based pagination with automatic page traversal (refer to `get_employees`, `get_departments`, and `get_skills` functions) +- Memory-efficient streaming prevents data accumulation for large employee datasets +- Incremental synchronization using timestamp-based cursors (refer to `get_time_range` function) +- Comprehensive error handling with exponential backoff retry logic (refer to `__handle_rate_limit` and `__handle_request_error` functions) +- Data quality validation and field mapping for consistent database schema (refer to `__map_employee_data`, `__map_department_data`, and `__map_skill_data` functions) + +## Configuration file +```json +{ + "api_key": "", + "sync_frequency_hours": "", + "initial_sync_days": "", + "max_records_per_page": "", + "request_timeout_seconds": "", + "retry_attempts": "", + "enable_incremental_sync": "", + "enable_debug_logging": "" +} +``` + +### Configuration parameters +- `api_key` (required): Your Planday HR API authentication key +- `sync_frequency_hours` (optional): How often to run incremental syncs, defaults to 4 hours +- `initial_sync_days` (optional): Number of days to sync during initial sync, defaults to 90 days +- `max_records_per_page` (optional): Maximum records per API request, defaults to 100 (range: 1-1000) +- `request_timeout_seconds` (optional): HTTP request timeout in seconds, defaults to 30 +- `retry_attempts` (optional): Number of retry attempts for failed requests, defaults to 3 +- `enable_incremental_sync` (optional): Enable timestamp-based incremental sync, defaults to true +- `enable_debug_logging` (optional): Enable detailed debug logging, defaults to false + +## Requirements file +This connector does not require any additional packages beyond those provided by the Fivetran environment. + +Note: The `fivetran_connector_sdk:latest` and `requests:latest` packages are pre-installed in the Fivetran environment. To avoid dependency conflicts, do not declare them in your `requirements.txt`. + +## Authentication +1. Log in to the [Planday HR Developer Portal](https://openapi.planday.com/api/hr). +2. Navigate to your application settings or API credentials section. +3. Generate a new API key or retrieve your existing authentication token. +4. Make a note of the `api_key` from your application settings. +5. Ensure your API key has appropriate permissions for reading employee, department, and skills data. + +Note: The connector uses Bearer token authentication with automatic retry logic for failed requests. API keys are never logged or exposed in plain text for security. + +## Pagination +Offset-based pagination with automatic page traversal (refer to `get_employees`, `get_departments`, and `get_skills` functions). Generator-based processing prevents memory accumulation for large employee datasets. Processes pages sequentially while yielding individual records for immediate processing. Each endpoint supports configurable page sizes through the `max_records_per_page` setting. + +## Data handling +Employee, department, and skills data is mapped from Planday HR API format to normalized database columns (refer to the `__map_employee_data`, `__map_department_data`, and `__map_skill_data` functions). Nested objects are flattened, and all timestamps are converted to UTC format for consistency across different time zones. + +Supports timestamp-based incremental synchronization using the `last_sync_time` state parameter (refer to the `get_time_range` function). Initial sync can be configured to fetch historical data up to 365 days. Memory-efficient streaming processes individual records without accumulating large datasets in memory. + +## Error handling +- 429 Rate Limited: Automatic retry with Retry-After header support (refer to the `__handle_rate_limit` function) +- Timeout handling with configurable retry attempts (refer to the `__handle_request_error` function) +- Exponential backoff with jitter prevents multiple clients from making requests at the same time +- Parameter validation with descriptive error messages provides clear guidance for fixing setup issues +- Network connectivity issues are handled with progressive retry delays and comprehensive logging + +## Tables created +| Table | Primary Key | Description | +|-------|-------------|-------------| +| EMPLOYEES | `id` | Employee personal information, job details, and compensation data | +| DEPARTMENTS | `id` | Organizational departments with management structure and budget information | +| SKILLS | `id` | Employee skills and competencies with proficiency levels and categories | + +Column types are automatically inferred by Fivetran. Sample columns include `first_name`, `last_name`, `email`, `department_id`, `hire_date`, `position`, `salary`, `hourly_rate`, `manager_id`, `name`, `description`, `budget`, `cost_center`, `category`, `level`, `certification_required`. + +## Additional files + +The connector includes several additional files to support functionality, testing, and deployment: + +- `requirements.txt` – Python dependency specification for Planday HR API integration and connector requirements including faker for mock testing. + +- `configuration.json` – Configuration template for API credentials and connector parameters (should be excluded from version control). + + +## Additional considerations +The examples provided are intended to help you effectively use Fivetran's Connector SDK. While we've tested the code, Fivetran cannot be held responsible for any unexpected or negative consequences that may arise from using these examples. For inquiries, please reach out to our Support team. \ No newline at end of file diff --git a/connectors/planday/configuration.json b/connectors/planday/configuration.json new file mode 100644 index 000000000..0b8466225 --- /dev/null +++ b/connectors/planday/configuration.json @@ -0,0 +1,10 @@ +{ + "api_key": "", + "sync_frequency_hours": "", + "initial_sync_days": "", + "max_records_per_page": "", + "request_timeout_seconds": "", + "retry_attempts": "", + "enable_incremental_sync": "", + "enable_debug_logging": "" +} \ No newline at end of file diff --git a/connectors/planday/connector.py b/connectors/planday/connector.py new file mode 100644 index 000000000..238af4c95 --- /dev/null +++ b/connectors/planday/connector.py @@ -0,0 +1,573 @@ +"""Planday HR API connector for syncing employee, department, and skills data. +This connector demonstrates how to fetch data from Planday HR API and upsert it into destination using memory-efficient streaming patterns. +See the Technical Reference documentation (https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update) +and the Best Practices documentation (https://fivetran.com/docs/connectors/connector-sdk/best-practices) for details +""" + +# For reading configuration from a JSON file +import json + +# Import required classes from fivetran_connector_sdk +from fivetran_connector_sdk import Connector + +# For enabling Logs in your connector code +from fivetran_connector_sdk import Logging as log + +# For supporting Data operations like Upsert(), Update(), Delete() and checkpoint() +from fivetran_connector_sdk import Operations as op + +# For making HTTP requests to Planday HR API +import requests + +# For handling dates and timestamps +from datetime import datetime, timedelta, timezone + +# For implementing delays in retry logic and rate limiting +import time + +# For adding jitter to retry delays +import random + +""" ADD YOUR SOURCE-SPECIFIC IMPORTS HERE +Example: import pandas, boto3, etc. +Add comment for each import to explain its purpose for users to follow. +""" +# Private constants (use __ prefix) +__API_ENDPOINT = "https://openapi.planday.com/api/hr" # Planday HR API base URL + + +def __get_config_int(configuration, key, default, min_val=None, max_val=None): + """ + Extract and validate integer configuration parameters with range checking. + This function safely extracts integer values from configuration and applies validation. + + Args: + configuration: Configuration dictionary containing connector settings. + key: The configuration key to extract. + default: Default value to return if key is missing or invalid. + min_val: Minimum allowed value (optional). + max_val: Maximum allowed value (optional). + + Returns: + int: The validated integer value or default if validation fails. + """ + try: + value = int(configuration.get(key, default)) + if min_val is not None and value < min_val: + return default + if max_val is not None and value > max_val: + return default + return value + except (ValueError, TypeError): + return default + + +def __get_config_str(configuration, key, default=""): + """ + Extract string configuration parameters with type safety. + This function safely extracts string values from configuration dictionary. + + Args: + configuration: Configuration dictionary containing connector settings. + key: The configuration key to extract. + default: Default value to return if key is missing. + + Returns: + str: The string value or default if key is missing. + """ + return str(configuration.get(key, default)) + + +def __get_config_bool(configuration, key, default=False): + """ + Extract and parse boolean configuration parameters from strings or boolean values. + This function handles string representations of boolean values commonly used in JSON configuration. + + Args: + configuration: Configuration dictionary containing connector settings. + key: The configuration key to extract. + default: Default boolean value to return if key is missing. + + Returns: + bool: The parsed boolean value or default if key is missing. + """ + value = configuration.get(key, default) + if isinstance(value, str): + return value.lower() in ("true", "1", "yes", "on") + return bool(value) + + +def __calculate_wait_time(attempt, response_headers, base_delay=1, max_delay=60): + """ + Calculate exponential backoff wait time with jitter for retry attempts. + This function implements exponential backoff with random jitter to prevent thundering herd problems. + + Args: + attempt: Current attempt number (0-based). + response_headers: HTTP response headers dictionary that may contain Retry-After. + base_delay: Base delay in seconds for exponential backoff. + max_delay: Maximum delay cap in seconds. + + Returns: + float: Wait time in seconds before next retry attempt. + """ + if "Retry-After" in response_headers: + return min(int(response_headers["Retry-After"]), max_delay) + + # Exponential backoff with jitter + wait_time = min(base_delay * (2**attempt), max_delay) + jitter = random.uniform(0.1, 0.3) * wait_time + return wait_time + jitter + + +def __handle_rate_limit(attempt, response): + """ + Handle HTTP 429 rate limiting responses with appropriate delays. + This function logs the rate limit and waits before allowing retry attempts. + + Args: + attempt: Current attempt number for logging purposes. + response: HTTP response object containing rate limit headers. + """ + wait_time = __calculate_wait_time(attempt, response.headers) + log.warning(f"Rate limit hit, waiting {wait_time:.1f} seconds before retry {attempt + 1}") + time.sleep(wait_time) + + +def __handle_request_error(attempt, retry_attempts, error, endpoint): + """ + Handle request errors with exponential backoff retry logic. + This function manages retry attempts for failed API requests with appropriate delays. + + Args: + attempt: Current attempt number (0-based). + retry_attempts: Total number of retry attempts allowed. + error: The exception that occurred during the request. + endpoint: API endpoint that failed for logging purposes. + + Raises: + Exception: Re-raises the original error after all retry attempts are exhausted. + """ + if attempt < retry_attempts - 1: + wait_time = __calculate_wait_time(attempt, {}) + log.warning( + f"Request failed for {endpoint}: {str(error)}. Retrying in {wait_time:.1f} seconds..." + ) + time.sleep(wait_time) + else: + log.severe(f"All retry attempts failed for {endpoint}: {str(error)}") + raise error + + +def execute_api_request(endpoint, api_key, params=None, configuration=None): + """ + Execute HTTP API requests with comprehensive error handling and retry logic. + This function handles authentication, rate limiting, timeouts, and network errors. + + Args: + endpoint: API endpoint path to request. + api_key: Authentication key for API access. + params: Query parameters for the request (optional). + configuration: Configuration dictionary for timeout and retry settings. + + Returns: + dict: Parsed JSON response from the API. + + Raises: + RuntimeError: If all retry attempts fail or unexpected errors occur. + requests.exceptions.RequestException: For unrecoverable HTTP errors. + """ + url = f"{__API_ENDPOINT}{endpoint}" + headers = {"Authorization": f"Bearer {api_key}"} + + timeout = __get_config_int(configuration, "request_timeout_seconds", 30) + retry_attempts = __get_config_int(configuration, "retry_attempts", 3) + + for attempt in range(retry_attempts): + try: + response = requests.get(url, headers=headers, params=params, timeout=timeout) + + if response.status_code == 429: + __handle_rate_limit(attempt, response) + continue + + response.raise_for_status() + return response.json() + + except requests.exceptions.RequestException as e: + __handle_request_error(attempt, retry_attempts, e, endpoint) + continue + + raise RuntimeError("Unexpected error in API request execution") + + +def __map_employee_data(record): + """ + Transform API response record to employee table schema format. + This function maps raw API employee fields to normalized database column names and types. + + Args: + record: Raw API response record dictionary for employee data. + + Returns: + dict: Transformed employee record ready for database insertion. + """ + return { + "id": record.get("id", ""), + "employee_number": record.get("employeeNumber", ""), + "first_name": record.get("firstName", ""), + "last_name": record.get("lastName", ""), + "email": record.get("email", ""), + "phone": record.get("phone", ""), + "department_id": record.get("departmentId", ""), + "position": record.get("position", ""), + "hire_date": record.get("hireDate", ""), + "employment_type": record.get("employmentType", ""), + "status": record.get("status", ""), + "manager_id": record.get("managerId", ""), + "hourly_rate": record.get("hourlyRate", 0.0), + "salary": record.get("salary", 0.0), + "created_at": record.get("createdAt", ""), + "updated_at": record.get("updatedAt", ""), + "synced_at": datetime.now(timezone.utc).isoformat(), + } + + +def __map_department_data(record): + """ + Transform API response record to department table schema format. + This function maps raw API department fields to normalized database column names and types. + + Args: + record: Raw API response record dictionary for department data. + + Returns: + dict: Transformed department record ready for database insertion. + """ + return { + "id": record.get("id", ""), + "name": record.get("name", ""), + "description": record.get("description", ""), + "manager_id": record.get("managerId", ""), + "parent_department_id": record.get("parentDepartmentId", ""), + "location": record.get("location", ""), + "budget": record.get("budget", 0.0), + "cost_center": record.get("costCenter", ""), + "created_at": record.get("createdAt", ""), + "updated_at": record.get("updatedAt", ""), + "synced_at": datetime.now(timezone.utc).isoformat(), + } + + +def __map_skill_data(record): + """ + Transform API response record to skill table schema format. + This function maps raw API skill fields to normalized database column names and types. + + Args: + record: Raw API response record dictionary for skill data. + + Returns: + dict: Transformed skill record ready for database insertion. + """ + return { + "id": record.get("id", ""), + "name": record.get("name", ""), + "description": record.get("description", ""), + "category": record.get("category", ""), + "level": record.get("level", ""), + "certification_required": record.get("certificationRequired", False), + "created_at": record.get("createdAt", ""), + "updated_at": record.get("updatedAt", ""), + "synced_at": datetime.now(timezone.utc).isoformat(), + } + + +def get_time_range(last_sync_time=None, configuration=None): + """ + Generate time range for incremental or initial data synchronization. + This function creates start and end timestamps for API queries based on sync state. + + Args: + last_sync_time: Timestamp of last successful sync (optional). + configuration: Configuration dictionary containing sync settings. + + Returns: + dict: Dictionary containing 'start' and 'end' timestamps in ISO format. + """ + end_time = datetime.now(timezone.utc).isoformat() + + if last_sync_time: + start_time = last_sync_time + else: + initial_sync_days = __get_config_int(configuration, "initial_sync_days", 90) + start_time = (datetime.now(timezone.utc) - timedelta(days=initial_sync_days)).isoformat() + + return {"start": start_time, "end": end_time} + + +def get_employees(api_key, params, last_sync_time=None, configuration=None): + """ + Fetch employee data using memory-efficient streaming approach with pagination. + This generator function prevents memory accumulation by yielding individual records. + + Args: + api_key: API authentication key for making requests. + params: Additional parameters for the API request. + last_sync_time: Timestamp for incremental sync (optional). + configuration: Configuration dictionary containing connector settings. + + Yields: + dict: Individual employee records mapped to destination schema. + + Raises: + RuntimeError: If API requests fail after all retry attempts. + """ + time_range = get_time_range(last_sync_time, configuration) + endpoint = "/employees" + max_records = __get_config_int(configuration, "max_records_per_page", 100) + + api_params = { + "limit": max_records, + "offset": 0, + "updatedAfter": time_range["start"], + "updatedBefore": time_range["end"], + } + api_params.update(params) + + offset = 0 + while True: + api_params["offset"] = offset + response = execute_api_request(endpoint, api_key, api_params, configuration) + + data = response.get("data", []) + if not data: + break + + # Yield individual records instead of accumulating + for record in data: + yield __map_employee_data(record) + + if len(data) < max_records: + break + offset += max_records + + +def get_departments(api_key, params, last_sync_time=None, configuration=None): + """ + Fetch department data using memory-efficient streaming approach with pagination. + This generator function prevents memory accumulation by yielding individual records. + + Args: + api_key: API authentication key for making requests. + params: Additional parameters for the API request. + last_sync_time: Timestamp for incremental sync (optional). + configuration: Configuration dictionary containing connector settings. + + Yields: + dict: Individual department records mapped to destination schema. + + Raises: + RuntimeError: If API requests fail after all retry attempts. + """ + time_range = get_time_range(last_sync_time, configuration) + endpoint = "/departments" + max_records = __get_config_int(configuration, "max_records_per_page", 100) + + api_params = { + "limit": max_records, + "offset": 0, + "updatedAfter": time_range["start"], + "updatedBefore": time_range["end"], + } + api_params.update(params) + + offset = 0 + while True: + api_params["offset"] = offset + response = execute_api_request(endpoint, api_key, api_params, configuration) + + data = response.get("data", []) + if not data: + break + + # Yield individual records instead of accumulating + for record in data: + yield __map_department_data(record) + + if len(data) < max_records: + break + offset += max_records + + +def get_skills(api_key, params, last_sync_time=None, configuration=None): + """ + Fetch skills data using memory-efficient streaming approach with pagination. + This generator function prevents memory accumulation by yielding individual records. + + Args: + api_key: API authentication key for making requests. + params: Additional parameters for the API request. + last_sync_time: Timestamp for incremental sync (optional). + configuration: Configuration dictionary containing connector settings. + + Yields: + dict: Individual skill records mapped to destination schema. + + Raises: + RuntimeError: If API requests fail after all retry attempts. + """ + time_range = get_time_range(last_sync_time, configuration) + endpoint = "/skills" + max_records = __get_config_int(configuration, "max_records_per_page", 100) + + api_params = { + "limit": max_records, + "offset": 0, + "updatedAfter": time_range["start"], + "updatedBefore": time_range["end"], + } + api_params.update(params) + + offset = 0 + while True: + api_params["offset"] = offset + response = execute_api_request(endpoint, api_key, api_params, configuration) + + data = response.get("data", []) + if not data: + break + + # Yield individual records instead of accumulating + for record in data: + yield __map_skill_data(record) + + if len(data) < max_records: + break + offset += max_records + + +def schema(configuration: dict): + """ + Define database schema with table names and primary keys for the connector. + This function specifies the destination tables and their primary keys for Fivetran to create. + + Args: + configuration: Configuration dictionary (not used but required by SDK). + + Returns: + list: List of table schema dictionaries with table names and primary keys. + """ + return [ + {"table": "employees", "primary_key": ["id"]}, + {"table": "departments", "primary_key": ["id"]}, + {"table": "skills", "primary_key": ["id"]}, + ] + + +def update(configuration: dict, state: dict): + """ + Main synchronization function that fetches and processes data from the Planday HR API. + This function orchestrates the entire sync process using memory-efficient streaming patterns. + + Args: + configuration: Configuration dictionary containing API credentials and settings. + state: State dictionary containing sync cursors and checkpoints from previous runs. + + Raises: + RuntimeError: If sync fails due to API errors or configuration issues. + """ + log.info("Starting Planday HR connector sync") + + # Extract configuration parameters (SDK auto-validates required fields) + api_key = __get_config_str(configuration, "api_key") + max_records_per_page = __get_config_int(configuration, "max_records_per_page", 100, 1, 1000) + enable_incremental = __get_config_bool(configuration, "enable_incremental_sync", True) + + # Get state for incremental sync + last_sync_time = state.get("last_sync_time") if enable_incremental else None + + try: + # Fetch employees data using generator with incremental checkpointing + log.info("Fetching employee data...") + employee_count = 0 + employee_page = 1 + + for employee in get_employees(api_key, {}, last_sync_time, configuration): + # The 'upsert' operation is used to insert or update data in the destination table. + # The op.upsert method is called with two arguments: + # - The first argument is the name of the table to upsert the data into. + # - The second argument is a dictionary containing the data to be upserted, + op.upsert(table="employees", data=employee) + employee_count += 1 + + # Checkpoint every page/batch to save progress incrementally + if employee_count % max_records_per_page == 0: + checkpoint_state = { + "last_sync_time": employee.get( + "updated_at", datetime.now(timezone.utc).isoformat() + ), + "last_processed_employee_page": employee_page, + } + # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume + # from the correct position in case of next sync or interruptions. + # Learn more about how and where to checkpoint by reading our best practices documentation + # (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation). + op.checkpoint(checkpoint_state) + employee_page += 1 + + # Fetch departments data + log.info("Fetching department data...") + department_count = 0 + + for department in get_departments(api_key, {}, last_sync_time, configuration): + # The 'upsert' operation is used to insert or update data in the destination table. + # The op.upsert method is called with two arguments: + # - The first argument is the name of the table to upsert the data into. + # - The second argument is a dictionary containing the data to be upserted, + op.upsert(table="departments", data=department) + department_count += 1 + + # Fetch skills data + log.info("Fetching skills data...") + skill_count = 0 + + for skill in get_skills(api_key, {}, last_sync_time, configuration): + # The 'upsert' operation is used to insert or update data in the destination table. + # The op.upsert method is called with two arguments: + # - The first argument is the name of the table to upsert the data into. + # - The second argument is a dictionary containing the data to be upserted, + op.upsert(table="skills", data=skill) + skill_count += 1 + + # Final checkpoint with completion status + final_state = {"last_sync_time": datetime.now(timezone.utc).isoformat()} + # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume + # from the correct position in case of next sync or interruptions. + # Learn more about how and where to checkpoint by reading our best practices documentation + # (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation). + op.checkpoint(final_state) + + log.info( + f"Sync completed successfully. Processed {employee_count} employees, {department_count} departments, {skill_count} skills." + ) + + except Exception as e: + log.severe(f"Sync failed: {str(e)}") + raise RuntimeError(f"Failed to sync data: {str(e)}") + + +# Connector instance +connector = Connector(update=update, schema=schema) + +# Check if the script is being run as the main module. +# This is Python's standard entry method allowing your script to be run directly from the command line or IDE 'run' button. +# This is useful for debugging while you write your code. Note this method is not called by Fivetran when executing your connector in production. +# Please test using the Fivetran debug command prior to finalizing and deploying your connector. +if __name__ == "__main__": + # Open the configuration.json file and load its contents + with open("configuration.json", "r") as f: + configuration = json.load(f) + + # Test the connector locally + connector.debug(configuration=configuration) From f4fc9909788495b55c7239d5ce4d7d9b00468afe Mon Sep 17 00:00:00 2001 From: Karun Veluru Date: Tue, 11 Nov 2025 09:11:48 -0600 Subject: [PATCH 2/2] feature(examples): Address PR comments --- README.md | 2 +- connectors/planday/README.md | 19 +++---------------- connectors/planday/configuration.json | 4 +--- 3 files changed, 5 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 79e7fcaed..37caa327d 100644 --- a/README.md +++ b/README.md @@ -85,7 +85,7 @@ These connectors are ready to use out of the box, requiring minimal modification - [odata_version_4](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/odata_api/odata_version_4) - This is an example of how to sync data from an OData API version 4 using Connector SDK. - [odata_version_4_using_python_odata](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/odata_api/odata_version_4_using_python_odata) - This is an example of how to sync data from an OData API version 4 using python-odata python library. - [pindrop](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/pindrop) - This is an example of how to sync nightly report data from Pindrop using Connector SDK. You need to provide your Pindrop client ID and client Secret for this example to work. -- [planday](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/planday) - This example shows how to sync employee, department, and skills data from Planday HR API by using Connector SDK. You need to provide your Planday HR API key for this example to work. +- [planday](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/planday) - This example shows how to sync employee, department, and skills data from the Planday HR API by using the Connector SDK. You need to provide your Planday HR API key for this example to work. - [rabbitmq](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/rabbitmq) - This example shows how to sync messages from RabbitMQ queues using Connector SDK. It uses the `pika` library to connect to RabbitMQ and fetch messages from specified queues. You need to provide your RabbitMQ connection URL for this example to work. - Redshift - [simple_redshift_connector](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/redshift/simple_redshift_connector) - This example shows how to sync records from Redshift by using Connector SDK. You need to provide your Redshift credentials for this example to work. diff --git a/connectors/planday/README.md b/connectors/planday/README.md index 134bbdc0d..8d10fec7a 100644 --- a/connectors/planday/README.md +++ b/connectors/planday/README.md @@ -26,25 +26,21 @@ Refer to the [Connector SDK Setup Guide](https://fivetran.com/docs/connectors/co ```json { "api_key": "", - "sync_frequency_hours": "", "initial_sync_days": "", "max_records_per_page": "", "request_timeout_seconds": "", "retry_attempts": "", - "enable_incremental_sync": "", - "enable_debug_logging": "" + "enable_incremental_sync": "" } ``` ### Configuration parameters - `api_key` (required): Your Planday HR API authentication key -- `sync_frequency_hours` (optional): How often to run incremental syncs, defaults to 4 hours - `initial_sync_days` (optional): Number of days to sync during initial sync, defaults to 90 days - `max_records_per_page` (optional): Maximum records per API request, defaults to 100 (range: 1-1000) - `request_timeout_seconds` (optional): HTTP request timeout in seconds, defaults to 30 - `retry_attempts` (optional): Number of retry attempts for failed requests, defaults to 3 - `enable_incremental_sync` (optional): Enable timestamp-based incremental sync, defaults to true -- `enable_debug_logging` (optional): Enable detailed debug logging, defaults to false ## Requirements file This connector does not require any additional packages beyond those provided by the Fivetran environment. @@ -61,10 +57,10 @@ Note: The `fivetran_connector_sdk:latest` and `requests:latest` packages are pre Note: The connector uses Bearer token authentication with automatic retry logic for failed requests. API keys are never logged or exposed in plain text for security. ## Pagination -Offset-based pagination with automatic page traversal (refer to `get_employees`, `get_departments`, and `get_skills` functions). Generator-based processing prevents memory accumulation for large employee datasets. Processes pages sequentially while yielding individual records for immediate processing. Each endpoint supports configurable page sizes through the `max_records_per_page` setting. +Offset-based pagination with automatic page traversal (refer to the `get_employees`, `get_departments`, and `get_skills` functions). Generator-based processing prevents memory accumulation for large employee datasets. Processes pages sequentially while yielding individual records for immediate processing. Each endpoint supports configurable page sizes through the `max_records_per_page` setting. ## Data handling -Employee, department, and skills data is mapped from Planday HR API format to normalized database columns (refer to the `__map_employee_data`, `__map_department_data`, and `__map_skill_data` functions). Nested objects are flattened, and all timestamps are converted to UTC format for consistency across different time zones. +Employee, department, and skills data are mapped from Planday HR API format to normalized database columns (refer to the `__map_employee_data`, `__map_department_data`, and `__map_skill_data` functions). Nested objects are flattened, and all timestamps are converted to UTC format for consistency across different time zones. Supports timestamp-based incremental synchronization using the `last_sync_time` state parameter (refer to the `get_time_range` function). Initial sync can be configured to fetch historical data up to 365 days. Memory-efficient streaming processes individual records without accumulating large datasets in memory. @@ -84,14 +80,5 @@ Supports timestamp-based incremental synchronization using the `last_sync_time` Column types are automatically inferred by Fivetran. Sample columns include `first_name`, `last_name`, `email`, `department_id`, `hire_date`, `position`, `salary`, `hourly_rate`, `manager_id`, `name`, `description`, `budget`, `cost_center`, `category`, `level`, `certification_required`. -## Additional files - -The connector includes several additional files to support functionality, testing, and deployment: - -- `requirements.txt` – Python dependency specification for Planday HR API integration and connector requirements including faker for mock testing. - -- `configuration.json` – Configuration template for API credentials and connector parameters (should be excluded from version control). - - ## Additional considerations The examples provided are intended to help you effectively use Fivetran's Connector SDK. While we've tested the code, Fivetran cannot be held responsible for any unexpected or negative consequences that may arise from using these examples. For inquiries, please reach out to our Support team. \ No newline at end of file diff --git a/connectors/planday/configuration.json b/connectors/planday/configuration.json index 0b8466225..c5c171055 100644 --- a/connectors/planday/configuration.json +++ b/connectors/planday/configuration.json @@ -1,10 +1,8 @@ { "api_key": "", - "sync_frequency_hours": "", "initial_sync_days": "", "max_records_per_page": "", "request_timeout_seconds": "", "retry_attempts": "", - "enable_incremental_sync": "", - "enable_debug_logging": "" + "enable_incremental_sync": "" } \ No newline at end of file