From be40f37d308e179752358df20cb64fc17029d748 Mon Sep 17 00:00:00 2001 From: himanshudube97 Date: Wed, 25 Dec 2024 01:38:03 +0530 Subject: [PATCH 1/4] added tests for TestCreateSource --- .gitignore | 3 + .../test_airbyte_integration.py | 277 +++++++++++------- 2 files changed, 173 insertions(+), 107 deletions(-) diff --git a/.gitignore b/.gitignore index 821a5df76..3f552f153 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,9 @@ __pycache__/ *.py[cod] *$py.class +#this is the pytest integreation tests config json +integration.config.json + # C extensions *.so diff --git a/ddpui/tests/integration_tests/test_airbyte_integration.py b/ddpui/tests/integration_tests/test_airbyte_integration.py index be96dca2a..4bfa6cb52 100644 --- a/ddpui/tests/integration_tests/test_airbyte_integration.py +++ b/ddpui/tests/integration_tests/test_airbyte_integration.py @@ -1,3 +1,4 @@ +import json import os import django from pydantic import ValidationError @@ -123,134 +124,196 @@ def test_workspace_id(): class TestAirbyteSource: """class which holds all the source tests""" - source_config = { - "url": "https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv", - "format": "csv", - "provider": {"storage": "HTTPS"}, - "dataset_name": "covid19data", - } + def load_configurations(): + """Loads source configurations from a JSON file.""" + config_file_path = os.getenv("INTEGRATION_TESTS_PATH") # this is the path of the json file. + with open(config_file_path, "r") as config_file: + return json.load(config_file) def test_source_connection(self, test_workspace_id): """tests connectivity to a source""" + source_configs = self.load_configurations() source_definitions = get_source_definitions(workspace_id=test_workspace_id)[ "sourceDefinitions" ] - for source_definition in source_definitions: - if source_definition["name"] == "File (CSV, JSON, Excel, Feather, Parquet)": - TestAirbyteSource.source_definition_id = source_definition["sourceDefinitionId"] - break - try: - res = check_source_connection( - test_workspace_id, - AirbyteSourceCreate( - name="unused", - sourceDefId=TestAirbyteSource.source_definition_id, - config=self.source_config, - ), - ) - CheckSourceConnectionTestResponse(**res) - except ValidationError as error: - raise ValueError(f"Response validation failed: {error.errors()}") from error + for source_config in source_configs: + source_name = source_config["source_name"] + config = source_config["config"] + + # Finiding source definition for the current source + source_definition = None + for sd in source_definitions: + if sd["name"] == source_name: + source_definition = sd + break + + if not source_definition: + raise ValueError(f"Source definition '{source_name}' not found.") + + source_definition_id = source_definition["sourceDefinitionId"] + + # Check the source connection + try: + res = check_source_connection( + test_workspace_id, + AirbyteSourceCreate( + name=f"source_{source_name.lower().replace(' ', '_')}", + sourceDefId=source_definition_id, + config=config, + ), + ) + CheckSourceConnectionTestResponse(**res) + print(f"Successfully connected to source: {source_name}") + except ValidationError as error: + raise ValueError( + f"Response validation failed for source '{source_name}': {error.errors()}" + ) + + def test_a_create_source(self, test_workspace_id): + """Tests source creation dynamically for each source.""" + source_configs = self.load_configurations() + source_definitions = get_source_definitions(workspace_id=test_workspace_id)[ + "sourceDefinitions" + ] - def test_a_create_source(self, test_workspace_id): # skipcq: PYL-R0201 - """tests source creation""" - payload = { - "sourcedef_id": TestAirbyteSource.source_definition_id, - "config": TestAirbyteSource.source_config, - "workspace_id": str(test_workspace_id), - "name": "source1", - } - try: - CreateSourceTestPayload(**payload) - except ValidationError as error: - raise ValueError(f"Field do not match in payload: {error.errors()}") from error - try: - res = create_source(**payload) - CreateSourceTestResponse(**res) - TestAirbyteSource.source_id = res["sourceId"] - except ValidationError as error: - raise ValueError(f"Response validation failed: {error.errors()}") from error + TestAirbyteSource.created_sources = [] # Store created source IDs for later tests + + for source_config in source_configs: + source_name = source_config["source_name"] + config = source_config["config"] + + # Find the source definition + source_definition = None + for sd in source_definitions: + if sd["name"] == source_name: + source_definition = sd + break + + if not source_definition: + raise ValueError(f"Source definition '{source_name}' not found.") + + source_definition_id = source_definition["sourceDefinitionId"] + + # Build the payload for source creation + payload = { + "sourcedef_id": source_definition_id, + "config": config, + "workspace_id": str(test_workspace_id), + "name": f"source_{source_name.lower().replace(' ', '_')}", + } + + # Create the source + try: + CreateSourceTestPayload(**payload) + res = create_source(**payload) + CreateSourceTestResponse(**res) + TestAirbyteSource.created_sources.append( + {"source_id": res["sourceId"], "config": config} + ) + print(f"Successfully created source: {source_name} with ID: {res['sourceId']}") + except ValidationError as error: + raise ValueError(f"Error creating source '{source_name}': {error.errors()}") def test_source_connection_for_update(self): - """tests connectivity to a source while editing""" - try: - res = check_source_connection_for_update( - TestAirbyteSource.source_id, - AirbyteSourceUpdateCheckConnection( - name="unused", - config=self.source_config, - ), - ) - CheckSourceConnectionTestResponse(**res) - except ValidationError as error: - raise ValueError(f"Response validation failed: {error.errors()}") from error - - def test_get_definitions(self, test_workspace_id): # skipcq: PYL-R0201 - """tests retrieval of source definitions""" + """Tests connectivity for updating all sources dynamically.""" + for source in TestAirbyteSource.created_sources: + source_id = source["source_id"] + config = source["config"] + source_name = source["name"] + + try: + res = check_source_connection_for_update( + source_id, + AirbyteSourceUpdateCheckConnection( + name=source_name, + config=config, + ), + ) + CheckSourceConnectionTestResponse(**res) + print(f"Successfully tested connection for update on source ID: {source_id}") + except ValidationError as error: + raise ValueError( + f"Response validation failed for source ID '{source_id}': {error.errors()}" + ) + + def test_get_definitions(self, test_workspace_id): + """Tests retrieval of source definitions.""" try: res = get_source_definitions(workspace_id=test_workspace_id)["sourceDefinitions"] GetSourceDefinitionsTestResponse(__root__=res) + print("Successfully retrieved source definitions.") except ValidationError as error: - raise ValueError(f"Response validation failed: {error.errors()}") from error - - def test_get_source_schema_catalog(self, test_workspace_id): # skipcq: PYL-R0201 - """fetches the schema catalog for a source""" - try: - res = get_source_schema_catalog(test_workspace_id, TestAirbyteSource.source_id) - GetSourceSchemaCatalogTestResponse(catalog=res) - except ValidationError as error: - raise ValueError(f"Response validation failed: {error.errors()}") from error - - def test_fail_to_get_source_schema_catalog(self, test_workspace_id): # skipcq: PYL-R0201 - """fetches the schema catalog for a source""" + raise ValueError(f"Response validation failed: {error.errors()}") + + def test_get_source_schema_catalog(self, test_workspace_id): + """Fetches the schema catalog for all created sources.""" + for source in TestAirbyteSource.created_sources: + source_id = source["source_id"] + try: + res = get_source_schema_catalog(test_workspace_id, source_id) + GetSourceSchemaCatalogTestResponse(catalog=res) + print(f"Successfully retrieved schema catalog for source ID: {source_id}") + except ValidationError as error: + raise ValueError( + f"Response validation failed for source ID '{source_id}': {error.errors()}" + ) + + def test_fail_to_get_source_schema_catalog(self, test_workspace_id): + """Fetches the schema catalog for a non-existent source to validate failure.""" try: get_source_schema_catalog(test_workspace_id, "not-a-source-id") except HttpError: - pass - - def test_get_source(self, test_workspace_id): # skipcq: PYL-R0201 - """tests retrieval of a single source""" - try: - res = get_source( - workspace_id=test_workspace_id, - source_id=TestAirbyteSource.source_id, - ) - GetSourceTestResponse(**res) - except ValidationError as error: - raise ValueError(f"Response validation failed: {error.errors()}") from error - - def test_get_sources(self, test_workspace_id): # skipcq: PYL-R0201 - """tests retrieval of all sources""" + print("Correctly failed to fetch schema catalog for invalid source ID.") + + def test_get_source(self, test_workspace_id): + """Tests retrieval of a single source.""" + for source in TestAirbyteSource.created_sources: + source_id = source["source_id"] + try: + res = get_source( + workspace_id=test_workspace_id, + source_id=source_id, + ) + GetSourceTestResponse(**res) + print(f"Successfully retrieved details for source ID: {source_id}") + except ValidationError as error: + raise ValueError( + f"Response validation failed for source ID '{source_id}': {error.errors()}" + ) + + def test_get_sources(self, test_workspace_id): + """Tests retrieval of all sources.""" try: res = get_sources(workspace_id=test_workspace_id)["sources"] GetSourcesTestResponse(sources=res) + print("Successfully retrieved all sources.") except ValidationError as error: - raise ValueError(f"Response validation failed: {error.errors()}") from error - - def test_update_source(self): # skipcq: PYL-R0201 - """tests updating a source""" - payload = { - "name": "source9", - "sourcedef_id": TestAirbyteSource.source_definition_id, - "source_id": TestAirbyteSource.source_id, - "config": { - "url": "https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv", - "format": "csv", - "provider": {"storage": "HTTPS"}, - "dataset_name": "covid19data", - }, - } - try: - UpdateSourceTestPayload(**payload) - except ValidationError as error: - raise ValueError(f"Field do not match in payload: {error.errors()}") from error - - try: - res = update_source(**payload) - UpdateSourceTestResponse(**res) - except ValidationError as error: - raise ValueError(f"Response validation failed: {error.errors()}") from error + raise ValueError(f"Response validation failed: {error.errors()}") + + def test_update_source(self): + """Tests updating all created sources dynamically.""" + for source in TestAirbyteSource.created_sources: + source_id = source["source_id"] + config = source["config"] + source_definition_id = source["sourceDefinitionId"] + + payload = { + "name": f"updated_{source_id}", + "sourcedef_id": source_definition_id, + "source_id": source_id, + "config": config, + } + + try: + UpdateSourceTestPayload(**payload) + res = update_source(**payload) + UpdateSourceTestResponse(**res) + print(f"Successfully updated source ID: {source_id}") + except ValidationError as error: + raise ValueError( + f"Response validation failed for source ID '{source_id}': {error.errors()}" + ) @pytest.fixture(scope="session") From 5e99da44f7bf95ec41db2b424574a0985bae92be Mon Sep 17 00:00:00 2001 From: himanshudube97 Date: Wed, 25 Dec 2024 02:15:31 +0530 Subject: [PATCH 2/4] TestWorkspace done --- .env.template | 5 +- .../test_airbyte_integration.py | 132 ++++++++++-------- 2 files changed, 81 insertions(+), 56 deletions(-) diff --git a/.env.template b/.env.template index 24e50104e..7590cfb9c 100644 --- a/.env.template +++ b/.env.template @@ -104,4 +104,7 @@ CLIENTS_DBT_MOUNT= LOGS_MOUNT= ADMIN_EMAIL= -ADMIN_DISCORD_WEBHOOK= \ No newline at end of file +ADMIN_DISCORD_WEBHOOK= + +#For airbyte integration testing we have a integration.config.file. So we have to define the path here. +INTEGRATION_TESTS_PATH= \ No newline at end of file diff --git a/ddpui/tests/integration_tests/test_airbyte_integration.py b/ddpui/tests/integration_tests/test_airbyte_integration.py index 4bfa6cb52..3865ef4bf 100644 --- a/ddpui/tests/integration_tests/test_airbyte_integration.py +++ b/ddpui/tests/integration_tests/test_airbyte_integration.py @@ -12,6 +12,16 @@ from ddpui.ddpairbyte.airbyte_service import * +def load_configurations(): + """Loads all configurations from a single JSON file.""" + config_file_path = os.getenv("INTEGRATION_TESTS_PATH") # file path defined in env file. + with open(config_file_path, "r") as config_file: + return json.load(config_file) + + +integration_configs = load_configurations() + + @pytest.mark.skip(reason="Skipping this test as airbyte integraion needs to be done") class TestDeleteSource: def test_create_workspace(self): # skipcq: PYL-R0201 @@ -65,51 +75,69 @@ def test_a_create_source(self, test_workspace_id): # skipcq: PYL-R0201 @pytest.mark.skip(reason="Skipping this test as airbyte integraion needs to be done") class TestWorkspace: - """class which holds all the workspace tests""" + """Class which holds all the workspace tests dynamically for multiple workspaces.""" - workspace_id = None + def test_create_workspace(self): + """Creates workspaces dynamically and checks airbyte response.""" + workspace_configs = integration_configs.get("workspaces", []) + TestWorkspace.created_workspaces = [] # Store created workspaces for later use - def test_create_workspace(self): # skipcq: PYL-R0201 - """creates a workspace, checks airbyte response""" - payload = {"name": "test_workspace"} - - try: - CreateWorkspaceTestPayload(**payload) - except ValidationError as error: - raise ValueError(f"Field do not match in the payload: {error.errors()}") from error + for workspace_config in workspace_configs: + payload = {"name": workspace_config["name"]} - try: - res = create_workspace(**payload) - CreateWorkspaceTestResponse(**res) - TestWorkspace.workspace_id = res["workspaceId"] - except ValidationError as error: - raise ValueError(f"Response validation failed: {error.errors()}") from error + try: + CreateWorkspaceTestPayload(**payload) + res = create_workspace(**payload) + CreateWorkspaceTestResponse(**res) + TestWorkspace.created_workspaces.append( + {"workspace_id": res["workspaceId"], "name": workspace_config["name"]} + ) + print( + f"Successfully created workspace: {workspace_config['name']} with ID: {res['workspaceId']}" + ) + except ValidationError as error: + raise ValueError( + f"Error creating workspace '{workspace_config['name']}': {error.errors()}" + ) - def test_get_workspace(self): # skipcq: PYL-R0201 - """gets a workspace, checks airbyte response""" - try: - res = get_workspace(workspace_id=TestWorkspace.workspace_id) - GetWorkspaceTestResponse(**res) - except ValidationError as error: - raise ValueError(f"Response validation failed: {error.errors()}") from error + def test_get_workspace(self): + """Gets details for all created workspaces.""" + for workspace in TestWorkspace.created_workspaces: + workspace_id = workspace["workspace_id"] + try: + res = get_workspace(workspace_id=workspace_id) + GetWorkspaceTestResponse(**res) + print(f"Successfully retrieved details for workspace ID: {workspace_id}") + except ValidationError as error: + raise ValueError( + f"Response validation failed for workspace ID '{workspace_id}': {error.errors()}" + ) - def test_get_workspaces(self): # skipcq: PYL-R0201 - """gets all workspaces, checks airbyte response""" + def test_get_workspaces(self): + """Gets all workspaces and checks airbyte response.""" try: res = get_workspaces() GetWorkspacesTestResponse(**res) + print("Successfully retrieved all workspaces.") except ValidationError as error: - raise ValueError(f"Response validation failed: {error.errors()}") from error + raise ValueError(f"Response validation failed: {error.errors()}") - def test_set_workspace_name(self): # skipcq: PYL-R0201 - """sets workspace name, checks airbyte response""" - new_name = "test" + def test_set_workspace_name(self): + """Sets new names for all created workspaces.""" + for workspace in TestWorkspace.created_workspaces: + workspace_id = workspace["workspace_id"] + new_name = f"{workspace['name']}_updated" - try: - res = set_workspace_name(workspace_id=TestWorkspace.workspace_id, name=new_name) - SetWorkspaceTestResponse(**res) - except ValidationError as error: - raise ValueError(f"Response validation failed: {error.errors()}") from error + payload = {"workspace_id": workspace_id, "name": new_name} + + try: + res = set_workspace_name(**payload) + SetWorkspaceTestResponse(**res) + print(f"Successfully updated workspace ID: {workspace_id} to new name: {new_name}") + except ValidationError as error: + raise ValueError( + f"Response validation failed for workspace ID '{workspace_id}': {error.errors()}" + ) @pytest.fixture(scope="session") @@ -124,32 +152,26 @@ def test_workspace_id(): class TestAirbyteSource: """class which holds all the source tests""" - def load_configurations(): - """Loads source configurations from a JSON file.""" - config_file_path = os.getenv("INTEGRATION_TESTS_PATH") # this is the path of the json file. - with open(config_file_path, "r") as config_file: - return json.load(config_file) - def test_source_connection(self, test_workspace_id): """tests connectivity to a source""" - source_configs = self.load_configurations() + source_configs = integration_configs.get("sources", []) source_definitions = get_source_definitions(workspace_id=test_workspace_id)[ "sourceDefinitions" ] for source_config in source_configs: - source_name = source_config["source_name"] + name = source_config["name"] config = source_config["config"] # Finiding source definition for the current source source_definition = None for sd in source_definitions: - if sd["name"] == source_name: + if sd["name"] == name: source_definition = sd break if not source_definition: - raise ValueError(f"Source definition '{source_name}' not found.") + raise ValueError(f"Source definition '{name}' not found.") source_definition_id = source_definition["sourceDefinitionId"] @@ -158,16 +180,16 @@ def test_source_connection(self, test_workspace_id): res = check_source_connection( test_workspace_id, AirbyteSourceCreate( - name=f"source_{source_name.lower().replace(' ', '_')}", + name=f"source_{name.lower().replace(' ', '_')}", sourceDefId=source_definition_id, config=config, ), ) CheckSourceConnectionTestResponse(**res) - print(f"Successfully connected to source: {source_name}") + print(f"Successfully connected to source: {name}") except ValidationError as error: raise ValueError( - f"Response validation failed for source '{source_name}': {error.errors()}" + f"Response validation failed for source '{name}': {error.errors()}" ) def test_a_create_source(self, test_workspace_id): @@ -180,18 +202,18 @@ def test_a_create_source(self, test_workspace_id): TestAirbyteSource.created_sources = [] # Store created source IDs for later tests for source_config in source_configs: - source_name = source_config["source_name"] + name = source_config["name"] config = source_config["config"] # Find the source definition source_definition = None for sd in source_definitions: - if sd["name"] == source_name: + if sd["name"] == name: source_definition = sd break if not source_definition: - raise ValueError(f"Source definition '{source_name}' not found.") + raise ValueError(f"Source definition '{name}' not found.") source_definition_id = source_definition["sourceDefinitionId"] @@ -200,7 +222,7 @@ def test_a_create_source(self, test_workspace_id): "sourcedef_id": source_definition_id, "config": config, "workspace_id": str(test_workspace_id), - "name": f"source_{source_name.lower().replace(' ', '_')}", + "name": f"source_{name.lower().replace(' ', '_')}", } # Create the source @@ -211,22 +233,22 @@ def test_a_create_source(self, test_workspace_id): TestAirbyteSource.created_sources.append( {"source_id": res["sourceId"], "config": config} ) - print(f"Successfully created source: {source_name} with ID: {res['sourceId']}") + print(f"Successfully created source: {name} with ID: {res['sourceId']}") except ValidationError as error: - raise ValueError(f"Error creating source '{source_name}': {error.errors()}") + raise ValueError(f"Error creating source '{name}': {error.errors()}") def test_source_connection_for_update(self): """Tests connectivity for updating all sources dynamically.""" for source in TestAirbyteSource.created_sources: source_id = source["source_id"] config = source["config"] - source_name = source["name"] + name = source["name"] try: res = check_source_connection_for_update( source_id, AirbyteSourceUpdateCheckConnection( - name=source_name, + name=name, config=config, ), ) From 2e6f5eda4bf43a648e28c208ce25fcfb047668a8 Mon Sep 17 00:00:00 2001 From: himanshudube97 Date: Wed, 25 Dec 2024 02:47:30 +0530 Subject: [PATCH 3/4] added test for Connections too --- .../test_airbyte_integration.py | 193 ++++++++++-------- 1 file changed, 107 insertions(+), 86 deletions(-) diff --git a/ddpui/tests/integration_tests/test_airbyte_integration.py b/ddpui/tests/integration_tests/test_airbyte_integration.py index 3865ef4bf..eb0a396ca 100644 --- a/ddpui/tests/integration_tests/test_airbyte_integration.py +++ b/ddpui/tests/integration_tests/test_airbyte_integration.py @@ -458,103 +458,124 @@ def test_destination_id(test_workspace_id): @pytest.mark.skip(reason="Skipping this test as airbyte integraion needs to be done") class TestConnection: - def test_a_create_connection( - self, - test_workspace_id, - test_source_id, - test_destination_id, - ): # skipcq: PYL-R0201 + def test_a_create_connections(self, test_workspace_id, test_destination_id): + """Creates connections dynamically for all sources and checks responses.""" workspace_id = str(test_workspace_id) - connection_info = schema.AirbyteConnectionCreate( - sourceId=str(test_source_id), - destinationId=str(test_destination_id), - name="Test Connection", - streams=[ - { - "name": "covid19data", - "selected": True, - "syncMode": "full_refresh", - "destinationSyncMode": "overwrite", - "cursorField": "default", - } - ], - ) + sources = get_sources(workspace_id=workspace_id)["sources"] # getting all the sources. + TestConnection.created_connections = [] # Store created connection IDs for later tests - try: - res = create_connection(workspace_id, connection_info) - CreateConnectionTestResponse(**res) - TestConnection.connection_id = res["connectionId"] - # check if the streams have been set in the connection - conn = get_connection(workspace_id, res["connectionId"]) - assert conn is not None - assert "syncCatalog" in conn - assert "streams" in conn["syncCatalog"] - assert len(conn["syncCatalog"]["streams"]) == len(connection_info.streams) - - for stream in conn["syncCatalog"]["streams"]: - assert "config" in stream - assert stream["config"]["selected"] is True - assert stream["config"]["cursorField"] == [] + for source in sources: + source_id = source["sourceId"] + source_name = source["name"] - except ValidationError as error: - raise ValueError(f"Response validation failed: {error.errors()}") from error + # Fetch streams dynamically for the source + streams = get_source_schema_catalog( + test_workspace_id, source_id + ) # this fetches the schema catalog for the source. + GetSourceSchemaCatalogTestResponse(catalog=res)(workspace_id, source_id) - def test_get_connection(self, test_workspace_id): # skipcq: PYL-R0201 - workspace_id = test_workspace_id - connection_id = TestConnection.connection_id + if not streams: + print( + f"No streams found for source: {source_name} ({source_id}). Skipping connection creation." + ) + continue - try: - res = get_connection(workspace_id, connection_id) - GetConnectionTestResponse(**res) - except ValidationError as error: - raise ValueError(f"Response validation failed: {error.errors()}") from error + connection_info = schema.AirbyteConnectionCreate( + sourceId=str(source_id), + destinationId=str(test_destination_id), + name=f"Connection_for_{source_name}", + streams=streams, + ) - def test_update_connection( - self, test_workspace_id, test_source_id, test_destination_id - ): # skipcq: PYL-R0201 + try: + # Create the connection + res = create_connection( + workspace_id, connection_info + ) # so we fetched all sources then, for each source we fetched streams and then created connection. + CreateConnectionTestResponse(**res) + TestConnection.created_connections.append( + { + "connection_id": res["connectionId"], + "source_id": source_id, + "streams": streams, + } + ) + + # Validate the created connection + conn = get_connection(workspace_id, res["connectionId"]) + assert conn is not None + assert "syncCatalog" in conn + assert "streams" in conn["syncCatalog"] + assert len(conn["syncCatalog"]["streams"]) == len(streams) + + print(f"Successfully created connection for source: {source_name} ({source_id})") + + except ValidationError as error: + raise ValueError( + f"Response validation failed for connection to source '{source_name}': {error.errors()}" + ) + + def test_get_connection(self, test_workspace_id): + """Gets details for all created connections.""" workspace_id = test_workspace_id - current_connection = get_connection(workspace_id, TestConnection.connection_id) - connection_info = schema.AirbyteConnectionUpdate( - sourceId=test_source_id, - destinationId=test_destination_id, - connectionId=TestConnection.connection_id, - streams=[ - { - "name": "covid19data", - "selected": True, - "syncMode": "full_refresh", - "destinationSyncMode": "append", - "cursorField": ["default"], - } - ], - name="New Connection Name", - ) - try: - res = update_connection(workspace_id, connection_info, current_connection) - UpdateConnectionTestResponse(**res) + for connection in TestConnection.created_connections: + connection_id = connection["connection_id"] - # check if the streams have been set in the connection - conn = get_connection(workspace_id, res["connectionId"]) - assert conn is not None - assert "syncCatalog" in conn - assert "streams" in conn["syncCatalog"] - assert len(conn["syncCatalog"]["streams"]) == len(connection_info.streams) + try: + res = get_connection(workspace_id, connection_id) + GetConnectionTestResponse(**res) + print(f"Successfully retrieved connection ID: {connection_id}") + except ValidationError as error: + raise ValueError( + f"Response validation failed for connection ID '{connection_id}': {error.errors()}" + ) - for stream in conn["syncCatalog"]["streams"]: - assert "config" in stream - assert stream["config"]["selected"] is True - assert stream["config"]["cursorField"] == [] + def test_update_connections(self, test_workspace_id, test_destination_id): + """Updates all created connections dynamically.""" + workspace_id = test_workspace_id - except ValidationError as error: - raise ValueError(f"Response validation failed: {error.errors()}") from error + for connection in TestConnection.created_connections: + connection_id = connection["connection_id"] + source_id = connection["source_id"] + streams = connection["streams"] + + # Modify the streams (e.g., change destinationSyncMode to "append") + updated_streams = [] + for stream in streams: + new_stream = dict(stream) # Copy the original stream + new_stream["destinationSyncMode"] = "append" # Update destinationSyncMode + updated_streams.append(new_stream) + + connection_info = schema.AirbyteConnectionUpdate( + sourceId=source_id, + destinationId=test_destination_id, + connectionId=connection_id, + streams=updated_streams, + name=f"Updated Connection for {connection_id}", + ) + + try: + res = update_connection(workspace_id, connection_info, {}) + UpdateConnectionTestResponse(**res) + print(f"Successfully updated connection ID: {connection_id}") + except ValidationError as error: + raise ValueError( + f"Response validation failed for connection ID '{connection_id}': {error.errors()}" + ) - def test_delete_connection(self, test_workspace_id): # skipcq: PYL-R0201 + def test_delete_connections(self, test_workspace_id): + """Deletes all created connections dynamically.""" workspace_id = test_workspace_id - connection_id = TestConnection.connection_id - try: - res = delete_connection(workspace_id, connection_id) - assert res == {} - except ValidationError as error: - raise ValueError(f"Response validation failed: {error.errors()}") from error + for connection in TestConnection.created_connections: + connection_id = connection["connection_id"] + + try: + res = delete_connection(workspace_id, connection_id) + assert res == {} + print(f"Successfully deleted connection ID: {connection_id}") + except ValidationError as error: + raise ValueError( + f"Response validation failed for connection ID '{connection_id}': {error.errors()}" + ) From 577520c9aee19aace309660b5d7028965eacaf81 Mon Sep 17 00:00:00 2001 From: himanshudube97 Date: Wed, 25 Dec 2024 02:56:04 +0530 Subject: [PATCH 4/4] fix --- ddpui/tests/integration_tests/test_airbyte_integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddpui/tests/integration_tests/test_airbyte_integration.py b/ddpui/tests/integration_tests/test_airbyte_integration.py index eb0a396ca..aaf0efba6 100644 --- a/ddpui/tests/integration_tests/test_airbyte_integration.py +++ b/ddpui/tests/integration_tests/test_airbyte_integration.py @@ -19,7 +19,7 @@ def load_configurations(): return json.load(config_file) -integration_configs = load_configurations() +# integration_configs = load_configurations() #commenting the code for a while @pytest.mark.skip(reason="Skipping this test as airbyte integraion needs to be done")