diff --git a/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/__pycache__/agents.cpython-312.pyc b/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/__pycache__/agents.cpython-312.pyc new file mode 100644 index 000000000..a6640fd08 Binary files /dev/null and b/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/__pycache__/agents.cpython-312.pyc differ diff --git a/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/__pycache__/knowledge_bases.cpython-312.pyc b/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/__pycache__/knowledge_bases.cpython-312.pyc new file mode 100644 index 000000000..a0199c1ca Binary files /dev/null and b/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/__pycache__/knowledge_bases.cpython-312.pyc differ diff --git a/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/__pycache__/knowledge_bases.cpython-313.pyc b/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/__pycache__/knowledge_bases.cpython-313.pyc new file mode 100644 index 000000000..46112ba05 Binary files /dev/null and b/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/__pycache__/knowledge_bases.cpython-313.pyc differ diff --git a/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/agents.py b/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/agents.py index 5894f872c..37e17d3da 100644 --- a/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/agents.py +++ b/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/agents.py @@ -8,18 +8,26 @@ import os import subprocess from pathlib import Path +from datetime import datetime print(f"Boto3 version: {boto3.__version__}") -iam_client = boto3.client('iam') -sts_client = boto3.client('sts') -s3_client = boto3.client('s3') -session = boto3.session.Session() +# Custom JSON encoder to handle datetime objects in trace data +class DateTimeEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, datetime): + return obj.isoformat() + return super().default(obj) + +iam_client = boto3.client('iam', region_name="us-west-2") +sts_client = boto3.client('sts', region_name="us-west-2") +s3_client = boto3.client('s3', region_name="us-west-2") +session = boto3.session.Session(region_name="us-west-2") region = session.region_name account_id = sts_client.get_caller_identity()["Account"] -dynamodb_client = boto3.client('dynamodb') -dynamodb_resource = boto3.resource('dynamodb') -lambda_client = boto3.client('lambda') +dynamodb_client = boto3.client('dynamodb', region_name="us-west-2") +dynamodb_resource = boto3.resource('dynamodb', region_name="us-west-2") +lambda_client = boto3.client('lambda', region_name="us-west-2") # bedrock_agent_client = boto3.client('bedrock-agent') bedrock_agent_client = boto3.client( 'bedrock-agent', @@ -728,13 +736,23 @@ def invoke_agent_helper( for key in event['chunk']: if key != 'bytes': logger.info(f"Chunck {key}:\n") - logger.info(json.dumps(event['chunk'][key], indent=3)) + try: + logger.info(json.dumps(event['chunk'][key], indent=3, cls=DateTimeEncoder)) + except Exception as chunk_error: + logger.warning(f"Could not serialize chunk data: {chunk_error}") + logger.info(f"Chunk data: {str(event['chunk'][key])}") agent_answer = data.decode('utf8') return agent_answer # End event indicates that the request finished successfully elif 'trace' in event: if enable_trace: - logger.info(json.dumps(event['trace'], indent=2)) + try: + logger.info(json.dumps(event['trace'], indent=2, cls=DateTimeEncoder)) + except Exception as trace_error: + logger.warning(f"Could not serialize trace data: {trace_error}") + logger.info(f"Trace event type: {type(event['trace'])}") + # Fallback to string representation + logger.info(f"Trace data: {str(event['trace'])}") else: raise Exception("unexpected event.", event) except Exception as e: @@ -772,7 +790,7 @@ def create_lambda_layer(agent_name, requirements): ], cwd="layer") # Create Lambda layer - lambda_client = boto3.client('lambda') + lambda_client = boto3.client('lambda', region_name="us-west-2") try: with open("layer/layer.zip", 'rb') as zip_file: diff --git a/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/custom_orchestration_example.ipynb b/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/custom_orchestration_example.ipynb index 160424b74..2530979fc 100644 --- a/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/custom_orchestration_example.ipynb +++ b/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/custom_orchestration_example.ipynb @@ -87,9 +87,9 @@ }, "outputs": [], "source": [ - "s3_client = boto3.client('s3')\n", - "sts_client = boto3.client('sts')\n", - "session = boto3.session.Session()\n", + "s3_client = boto3.client('s3', region_name=\"us-west-2\")\n", + "sts_client = boto3.client('sts', region_name=\"us-west-2\")\n", + "session = boto3.session.Session(region_name=\"us-west-2\")\n", "region = session.region_name\n", "account_id = sts_client.get_caller_identity()[\"Account\"]\n", "knowledge_base_name = f'restaurant-kb'\n", @@ -112,7 +112,7 @@ " knowledge_base_name,\n", " knowledge_base_description,\n", " bucket_name\n", - ")" + ")\n" ] }, { @@ -390,7 +390,7 @@ }, "outputs": [], "source": [ - "%%time\n", + "import time\n", "import uuid\n", "session_id:str = str(uuid.uuid1())\n", "\n", @@ -437,7 +437,7 @@ }, "outputs": [], "source": [ - "%%time\n", + "import time\n", "import uuid\n", "session_id:str = str(uuid.uuid1())\n", "\n", @@ -478,7 +478,7 @@ }, "outputs": [], "source": [ - "%%time\n", + "import time\n", "import uuid\n", "session_id:str = str(uuid.uuid1())\n", "\n", @@ -619,7 +619,7 @@ }, "outputs": [], "source": [ - "%%time\n", + "import time\n", "import uuid\n", "session_id:str = str(uuid.uuid1())\n", "session_state={\n", @@ -668,7 +668,7 @@ }, "outputs": [], "source": [ - "%%time\n", + "import time\n", "import uuid\n", "session_id:str = str(uuid.uuid1())\n", "session_state={\n", @@ -713,7 +713,7 @@ }, "outputs": [], "source": [ - "%%time\n", + "import time\n", "import uuid\n", "session_id:str = str(uuid.uuid1())\n", "query = \"What do you serve for dinner? can you make a reservation for 4 people, at 9pm tonight.\"\n", @@ -825,12 +825,12 @@ }, "outputs": [], "source": [ - "# clean up react agent\n", - "clean_up_resources(\n", - " agent_name_react,\n", - " custom_orchestration_lambda_function_name=None,\n", - " dynamodb_table=f'{agent_name_react}-table'\n", - ")" + "# # clean up react agent\n", + "# clean_up_resources(\n", + "# agent_name_react,\n", + "# custom_orchestration_lambda_function_name=None,\n", + "# dynamodb_table=f'{agent_name_react}-table'\n", + "# )" ] }, { @@ -840,12 +840,12 @@ "metadata": {}, "outputs": [], "source": [ - "# clean up rewoo agent\n", - "clean_up_resources(\n", - " agent_name_rewoo,\n", - " custom_orchestration_lambda_function_name=custom_orchestration_lambda_name,\n", - " dynamodb_table=f'{agent_name_rewoo}-table'\n", - ")" + "# # clean up rewoo agent\n", + "# clean_up_resources(\n", + "# agent_name_rewoo,\n", + "# custom_orchestration_lambda_function_name=custom_orchestration_lambda_name,\n", + "# dynamodb_table=f'{agent_name_rewoo}-table'\n", + "# )" ] }, { @@ -857,10 +857,10 @@ }, "outputs": [], "source": [ - "# delete kb\n", - "kb.delete_kb(\n", - " kb_name=knowledge_base_name, delete_s3_bucket=True, delete_iam_roles_and_policies=True\n", - ")" + "# # delete kb\n", + "# kb.delete_kb(\n", + "# kb_name=knowledge_base_name, delete_s3_bucket=True, delete_iam_roles_and_policies=True\n", + "# )" ] }, { @@ -1491,7 +1491,7 @@ ], "instance_type": "ml.t3.medium", "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "3.12.5", "language": "python", "name": "python3" }, @@ -1505,7 +1505,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.11" + "version": "3.12.5" } }, "nbformat": 4, diff --git a/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/knowledge_bases.py b/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/knowledge_bases.py index ed0b411a6..e60ecbf16 100644 --- a/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/knowledge_bases.py +++ b/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/knowledge_bases.py @@ -24,11 +24,21 @@ import json import boto3 import time -from botocore.exceptions import ClientError +import logging +from botocore.exceptions import ClientError, BotoCoreError from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth, RequestError import pprint from retrying import retry import random +import requests +from requests_aws4auth import AWS4Auth + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) valid_embedding_models = [ "cohere.embed-multilingual-v3", "cohere.embed-english-v3", "amazon.titan-embed-text-v1", @@ -50,6 +60,56 @@ def interactive_sleep(seconds: int): time.sleep(1) +def retry_with_iam_propagation(max_retries=3, base_wait_time=30): + """ + Decorator to retry operations that may fail due to IAM permission propagation delays + Args: + max_retries: Maximum number of retry attempts + base_wait_time: Base wait time in seconds between retries + """ + def decorator(func): + def wrapper(*args, **kwargs): + last_exception = None + for attempt in range(max_retries + 1): + try: + return func(*args, **kwargs) + except (ClientError, BotoCoreError) as e: + last_exception = e + error_code = getattr(e.response.get('Error', {}), 'Code', 'Unknown') if hasattr(e, 'response') else 'Unknown' + + # Check if this is an IAM-related error that might benefit from retry + iam_related_errors = [ + 'AccessDeniedException', + 'UnauthorizedOperation', + 'InvalidUserID.NotFound', + 'ValidationException' # Sometimes IAM propagation causes validation errors + ] + + if attempt < max_retries and error_code in iam_related_errors: + wait_time = base_wait_time * (2 ** attempt) # Exponential backoff + logger.warning(f"IAM-related error detected (attempt {attempt + 1}/{max_retries + 1}): {e}") + logger.info(f"Waiting {wait_time} seconds for IAM permission propagation before retry...") + interactive_sleep(wait_time) + continue + else: + raise + except Exception as e: + last_exception = e + if attempt < max_retries: + wait_time = base_wait_time * (2 ** attempt) + logger.warning(f"Unexpected error (attempt {attempt + 1}/{max_retries + 1}): {e}") + logger.info(f"Waiting {wait_time} seconds before retry...") + interactive_sleep(wait_time) + continue + else: + raise + + # If we get here, all retries failed + raise last_exception + return wrapper + return decorator + + class KnowledgeBasesForAmazonBedrock: """ Support class that allows for: @@ -63,20 +123,20 @@ def __init__(self, suffix=None): """ Class initializer """ - boto3_session = boto3.session.Session() + boto3_session = boto3.session.Session(region_name="us-west-2") self.region_name = boto3_session.region_name self.iam_client = boto3_session.client('iam') - self.account_number = boto3.client('sts').get_caller_identity().get('Account') + self.account_number = boto3_session.client('sts').get_caller_identity().get('Account') self.suffix = random.randrange(200, 900) - self.identity = boto3.client('sts').get_caller_identity()['Arn'] - self.aoss_client = boto3_session.client('opensearchserverless') - self.s3_client = boto3.client('s3') - self.bedrock_agent_client = boto3.client('bedrock-agent') - self.bedrock_agent_client = boto3.client( + self.identity = boto3_session.client('sts').get_caller_identity()['Arn'] + self.aoss_client = boto3_session.client('opensearchserverless', region_name="us-west-2") + self.s3_client = boto3_session.client('s3', region_name="us-west-2") + self.bedrock_agent_client = boto3_session.client('bedrock-agent', region_name="us-west-2") + self.bedrock_agent_client = boto3_session.client( 'bedrock-agent', region_name=self.region_name ) - credentials = boto3.Session().get_credentials() + credentials = boto3_session.get_credentials() self.awsauth = AWSV4SignerAuth(credentials, self.region_name, 'aoss') self.oss_client = None @@ -85,8 +145,23 @@ def create_or_retrieve_knowledge_base( kb_name: str, kb_description: str = None, data_bucket_name: str = None, - embedding_model: str = "amazon.titan-embed-text-v2:0" + embedding_model: str = "amazon.titan-embed-text-v2:0", + use_native_vector_store: bool = False ): + """ + Function used to create a new Knowledge Base or retrieve an existent one. + + Args: + kb_name: Knowledge Base Name + kb_description: Knowledge Base Description + data_bucket_name: Name of s3 Bucket containing Knowledge Base Data + embedding_model: Name of Embedding model to be used on Knowledge Base creation + use_native_vector_store: If True, uses Bedrock's built-in vector store instead of OpenSearch + + Returns: + kb_id: str - Knowledge base id + ds_id: str - Data Source id + """ """ Function used to create a new Knowledge Base or retrieve an existent one @@ -121,6 +196,13 @@ def create_or_retrieve_knowledge_base( print(f"Retrieved Data Source Id: {ds_id}") else: print(f"Creating KB {kb_name}") + + # Check if we should use Bedrock's native vector store + if use_native_vector_store: + print("Using Bedrock's native vector store instead of OpenSearch Serverless") + return self.create_knowledge_base_with_native_vector_store( + kb_name, kb_description, data_bucket_name, embedding_model + ) # self.kb_name = kb_name # self.kb_description = kb_description if data_bucket_name is None: @@ -159,7 +241,16 @@ def create_or_retrieve_knowledge_base( host, collection, collection_id, collection_arn = self.create_oss( vector_store_name, oss_policy_name, bedrock_kb_execution_role ) - # Build the OpenSearch client + # Wait for the collection and permissions to fully propagate + logger.info("Waiting for OpenSearch collection and permissions to propagate (180 seconds)...") + print("Waiting for OpenSearch collection and permissions to propagate...") + print("This includes collection creation, data access policies, and IAM role permissions...") + interactive_sleep(180) # Increased wait time for better propagation + logger.info("✓ OpenSearch permission propagation wait completed") + + # Build the OpenSearch client with fresh credentials + credentials = boto3.session.Session(region_name="us-west-2").get_credentials() + self.awsauth = AWSV4SignerAuth(credentials, self.region_name, 'aoss') self.oss_client = OpenSearch( hosts=[{'host': host, 'port': 443}], http_auth=self.awsauth, @@ -171,7 +262,12 @@ def create_or_retrieve_knowledge_base( print("========================================================================================") print(f"Step 5 - Creating OSS Vector Index") - self.create_vector_index(index_name) + self.create_vector_index(index_name, embedding_model) + + # Additional wait to ensure index is fully ready before creating knowledge base + print("Waiting for index to be fully ready before creating knowledge base...") + interactive_sleep(60) + print("========================================================================================") print(f"Step 6 - Creating Knowledge Base") knowledge_base, data_source = self.create_knowledge_base( @@ -190,20 +286,43 @@ def create_s3_bucket(self, bucket_name: str): Args: bucket_name: s3 bucket name """ + logger.info(f"Checking if S3 bucket '{bucket_name}' exists...") try: self.s3_client.head_bucket(Bucket=bucket_name) + logger.info(f'✓ S3 bucket {bucket_name} already exists - retrieving it!') print(f'Bucket {bucket_name} already exists - retrieving it!') except ClientError as e: - print(f'Creating bucket {bucket_name}') - if self.region_name == "us-east-1": - self.s3_client.create_bucket( - Bucket=bucket_name - ) + error_code = e.response['Error']['Code'] + if error_code == '404': + logger.info(f"S3 bucket '{bucket_name}' does not exist, creating it...") + print(f'Creating bucket {bucket_name}') + try: + if self.region_name == "us-east-1": + self.s3_client.create_bucket(Bucket=bucket_name) + else: + self.s3_client.create_bucket( + Bucket=bucket_name, + CreateBucketConfiguration={'LocationConstraint': self.region_name} + ) + logger.info(f"✓ Successfully created S3 bucket '{bucket_name}'") + except ClientError as create_error: + logger.error(f"✗ Failed to create S3 bucket '{bucket_name}': {create_error}") + raise Exception(f"Failed to create S3 bucket: {create_error}") + except Exception as create_error: + logger.error(f"✗ Unexpected error creating S3 bucket '{bucket_name}': {create_error}") + raise + elif error_code == '403': + logger.error(f"✗ Access denied to S3 bucket '{bucket_name}'. Check IAM permissions.") + raise Exception(f"Access denied to S3 bucket '{bucket_name}'. Check IAM permissions.") else: - self.s3_client.create_bucket( - Bucket=bucket_name, - CreateBucketConfiguration={'LocationConstraint': self.region_name} - ) + logger.error(f"✗ Error checking S3 bucket '{bucket_name}': {e}") + raise Exception(f"Error checking S3 bucket: {e}") + except BotoCoreError as e: + logger.error(f"✗ AWS service error while checking S3 bucket '{bucket_name}': {e}") + raise Exception(f"AWS service error: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error while checking S3 bucket '{bucket_name}': {e}") + raise def create_bedrock_kb_execution_role( self, @@ -291,55 +410,125 @@ def create_bedrock_kb_execution_role( } ] } + logger.info("Creating IAM policies for Knowledge Base execution role...") + + # Create foundation model policy try: - # create policies based on the policy documents + logger.info(f"Creating foundation model policy: {fm_policy_name}") fm_policy = self.iam_client.create_policy( PolicyName=fm_policy_name, PolicyDocument=json.dumps(foundation_model_policy_document), Description='Policy for accessing foundation model', ) + logger.info(f"✓ Successfully created foundation model policy: {fm_policy_name}") except self.iam_client.exceptions.EntityAlreadyExistsException: + logger.info(f"Foundation model policy {fm_policy_name} already exists, retrieving it") print(f"{fm_policy_name} already exists, retrieving it!") - fm_policy = self.iam_client.get_policy( - PolicyArn=f"arn:aws:iam::{self.account_number}:policy/{fm_policy_name}" - ) + try: + fm_policy = self.iam_client.get_policy( + PolicyArn=f"arn:aws:iam::{self.account_number}:policy/{fm_policy_name}" + ) + logger.info(f"✓ Retrieved existing foundation model policy: {fm_policy_name}") + except ClientError as e: + logger.error(f"✗ Failed to retrieve foundation model policy {fm_policy_name}: {e}") + raise Exception(f"Failed to retrieve foundation model policy: {e}") + except ClientError as e: + logger.error(f"✗ Failed to create foundation model policy {fm_policy_name}: {e}") + raise Exception(f"Failed to create foundation model policy: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error creating foundation model policy {fm_policy_name}: {e}") + raise + # Create S3 policy try: + logger.info(f"Creating S3 access policy: {s3_policy_name}") s3_policy = self.iam_client.create_policy( PolicyName=s3_policy_name, PolicyDocument=json.dumps(s3_policy_document), Description='Policy for reading documents from s3') + logger.info(f"✓ Successfully created S3 access policy: {s3_policy_name}") except self.iam_client.exceptions.EntityAlreadyExistsException: + logger.info(f"S3 access policy {s3_policy_name} already exists, retrieving it") print(f"{s3_policy_name} already exists, retrieving it!") - s3_policy = self.iam_client.get_policy( - PolicyArn=f"arn:aws:iam::{self.account_number}:policy/{s3_policy_name}" - ) - # create bedrock execution role + try: + s3_policy = self.iam_client.get_policy( + PolicyArn=f"arn:aws:iam::{self.account_number}:policy/{s3_policy_name}" + ) + logger.info(f"✓ Retrieved existing S3 access policy: {s3_policy_name}") + except ClientError as e: + logger.error(f"✗ Failed to retrieve S3 access policy {s3_policy_name}: {e}") + raise Exception(f"Failed to retrieve S3 access policy: {e}") + except ClientError as e: + logger.error(f"✗ Failed to create S3 access policy {s3_policy_name}: {e}") + raise Exception(f"Failed to create S3 access policy: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error creating S3 access policy {s3_policy_name}: {e}") + raise + + # Create bedrock execution role try: + logger.info(f"Creating Bedrock execution role: {kb_execution_role_name}") bedrock_kb_execution_role = self.iam_client.create_role( RoleName=kb_execution_role_name, AssumeRolePolicyDocument=json.dumps(assume_role_policy_document), Description='Amazon Bedrock Knowledge Base Execution Role for accessing OSS and S3', MaxSessionDuration=3600 ) + logger.info(f"✓ Successfully created Bedrock execution role: {kb_execution_role_name}") except self.iam_client.exceptions.EntityAlreadyExistsException: + logger.info(f"Bedrock execution role {kb_execution_role_name} already exists, retrieving it") print(f"{kb_execution_role_name} already exists, retrieving it!") - bedrock_kb_execution_role = self.iam_client.get_role( - RoleName=kb_execution_role_name - ) - # fetch arn of the policies and role created above - s3_policy_arn = s3_policy["Policy"]["Arn"] - fm_policy_arn = fm_policy["Policy"]["Arn"] + try: + bedrock_kb_execution_role = self.iam_client.get_role( + RoleName=kb_execution_role_name + ) + logger.info(f"✓ Retrieved existing Bedrock execution role: {kb_execution_role_name}") + except ClientError as e: + logger.error(f"✗ Failed to retrieve Bedrock execution role {kb_execution_role_name}: {e}") + raise Exception(f"Failed to retrieve Bedrock execution role: {e}") + except ClientError as e: + logger.error(f"✗ Failed to create Bedrock execution role {kb_execution_role_name}: {e}") + raise Exception(f"Failed to create Bedrock execution role: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error creating Bedrock execution role {kb_execution_role_name}: {e}") + raise + + # Fetch ARNs of the policies and role created above + try: + s3_policy_arn = s3_policy["Policy"]["Arn"] + fm_policy_arn = fm_policy["Policy"]["Arn"] + logger.info(f"Policy ARNs retrieved - FM: {fm_policy_arn}, S3: {s3_policy_arn}") + except KeyError as e: + logger.error(f"✗ Failed to extract policy ARNs: {e}") + raise Exception(f"Failed to extract policy ARNs: {e}") - # attach policies to Amazon Bedrock execution role - self.iam_client.attach_role_policy( - RoleName=bedrock_kb_execution_role["Role"]["RoleName"], - PolicyArn=fm_policy_arn - ) - self.iam_client.attach_role_policy( - RoleName=bedrock_kb_execution_role["Role"]["RoleName"], - PolicyArn=s3_policy_arn - ) + # Attach policies to Amazon Bedrock execution role + try: + logger.info("Attaching foundation model policy to execution role...") + self.iam_client.attach_role_policy( + RoleName=bedrock_kb_execution_role["Role"]["RoleName"], + PolicyArn=fm_policy_arn + ) + logger.info("✓ Foundation model policy attached successfully") + + logger.info("Attaching S3 access policy to execution role...") + self.iam_client.attach_role_policy( + RoleName=bedrock_kb_execution_role["Role"]["RoleName"], + PolicyArn=s3_policy_arn + ) + logger.info("✓ S3 access policy attached successfully") + + # Wait for IAM permission propagation + logger.info("Waiting for IAM permission propagation (30 seconds)...") + interactive_sleep(30) + logger.info("✓ IAM permission propagation wait completed") + + except ClientError as e: + logger.error(f"✗ Failed to attach policies to execution role: {e}") + raise Exception(f"Failed to attach policies to execution role: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error attaching policies to execution role: {e}") + raise return bedrock_kb_execution_role def create_oss_policy_attach_bedrock_execution_role( @@ -387,10 +576,27 @@ def create_oss_policy_attach_bedrock_execution_role( print(f"Policy {oss_policy_arn} already exists, updating it") print("Opensearch serverless arn: ", oss_policy_arn) + # Attach our custom policy self.iam_client.attach_role_policy( RoleName=bedrock_kb_execution_role["Role"]["RoleName"], PolicyArn=oss_policy_arn ) + + # Attach AWS managed policies for full access + opensearch_managed_policy = "arn:aws:iam::aws:policy/AmazonOpenSearchServiceFullAccess" + print(f"Attaching AWS managed policy: {opensearch_managed_policy}") + self.iam_client.attach_role_policy( + RoleName=bedrock_kb_execution_role["Role"]["RoleName"], + PolicyArn=opensearch_managed_policy + ) + + # Attach AdministratorAccess policy as suggested by user + admin_policy = "arn:aws:iam::aws:policy/AdministratorAccess" + print(f"Attaching AWS managed policy: {admin_policy}") + self.iam_client.attach_role_policy( + RoleName=bedrock_kb_execution_role["Role"]["RoleName"], + PolicyArn=admin_policy + ) return created def create_policies_in_oss( @@ -398,8 +604,8 @@ def create_policies_in_oss( bedrock_kb_execution_role: str, access_policy_name: str ): """ - Create OpenSearch Serverless encryption, network and data access policies. - If policies already exist, retrieve them + Create comprehensive OpenSearch Serverless encryption, network and data access policies. + If policies already exist, retrieve them with enhanced error handling. Args: encryption_policy_name: name of the data encryption policy vector_store_name: name of the vector store @@ -410,7 +616,13 @@ def create_policies_in_oss( Returns: encryption_policy, network_policy, access_policy """ + logger.info(f"Creating OpenSearch Serverless policies for collection: {vector_store_name}") + print(f"Creating OpenSearch Serverless policies for collection: {vector_store_name}") + + # Create encryption policy with AWS managed keys try: + logger.info(f"Creating encryption policy: {encryption_policy_name}") + print(f"Creating encryption policy: {encryption_policy_name}") encryption_policy = self.aoss_client.create_security_policy( name=encryption_policy_name, policy=json.dumps( @@ -421,14 +633,34 @@ def create_policies_in_oss( }), type='encryption' ) + logger.info(f"✓ Successfully created encryption policy: {encryption_policy_name}") + print(f"✓ Successfully created encryption policy: {encryption_policy_name}") except self.aoss_client.exceptions.ConflictException: - print(f"{encryption_policy_name} already exists, retrieving it!") - encryption_policy = self.aoss_client.get_security_policy( - name=encryption_policy_name, - type='encryption' - ) + logger.info(f"Encryption policy {encryption_policy_name} already exists, retrieving it") + print(f"Encryption policy {encryption_policy_name} already exists, retrieving it") + try: + encryption_policy = self.aoss_client.get_security_policy( + name=encryption_policy_name, + type='encryption' + ) + logger.info(f"✓ Retrieved existing encryption policy: {encryption_policy_name}") + print(f"✓ Retrieved existing encryption policy: {encryption_policy_name}") + except ClientError as e: + logger.error(f"✗ Failed to retrieve encryption policy {encryption_policy_name}: {e}") + raise Exception(f"Failed to retrieve encryption policy: {e}") + except ClientError as e: + logger.error(f"✗ Failed to create encryption policy {encryption_policy_name}: {e}") + print(f"✗ Error creating encryption policy: {e}") + raise Exception(f"Failed to create encryption policy: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error creating encryption policy {encryption_policy_name}: {e}") + print(f"✗ Error creating encryption policy: {e}") + raise + # Create network policy with public access try: + logger.info(f"Creating network policy: {network_policy_name}") + print(f"Creating network policy: {network_policy_name}") network_policy = self.aoss_client.create_security_policy( name=network_policy_name, policy=json.dumps( @@ -439,14 +671,44 @@ def create_policies_in_oss( ]), type='network' ) + logger.info(f"✓ Successfully created network policy: {network_policy_name}") + print(f"✓ Successfully created network policy: {network_policy_name}") except self.aoss_client.exceptions.ConflictException: - print(f"{network_policy_name} already exists, retrieving it!") - network_policy = self.aoss_client.get_security_policy( - name=network_policy_name, - type='network' - ) + logger.info(f"Network policy {network_policy_name} already exists, retrieving it") + print(f"Network policy {network_policy_name} already exists, retrieving it") + try: + network_policy = self.aoss_client.get_security_policy( + name=network_policy_name, + type='network' + ) + logger.info(f"✓ Retrieved existing network policy: {network_policy_name}") + print(f"✓ Retrieved existing network policy: {network_policy_name}") + except ClientError as e: + logger.error(f"✗ Failed to retrieve network policy {network_policy_name}: {e}") + raise Exception(f"Failed to retrieve network policy: {e}") + except ClientError as e: + logger.error(f"✗ Failed to create network policy {network_policy_name}: {e}") + print(f"✗ Error creating network policy: {e}") + raise Exception(f"Failed to create network policy: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error creating network policy {network_policy_name}: {e}") + print(f"✗ Error creating network policy: {e}") + raise + # Create comprehensive data access policy try: + logger.info(f"Creating data access policy: {access_policy_name}") + print(f"Creating data access policy: {access_policy_name}") + + # Include both the Bedrock execution role and the current user identity + principals = [bedrock_kb_execution_role['Role']['Arn']] + + # Add current user identity to allow index creation + if self.identity: + principals.append(self.identity) + logger.info(f"Adding current user identity to access policy: {self.identity}") + print(f"Adding current user identity to access policy: {self.identity}") + access_policy = self.aoss_client.create_access_policy( name=access_policy_name, policy=json.dumps( @@ -473,116 +735,313 @@ def create_policies_in_oss( 'aoss:WriteDocument'], 'ResourceType': 'index' }], - 'Principal': [self.identity, bedrock_kb_execution_role['Role']['Arn']], - 'Description': 'Easy data policy'} + 'Principal': principals, + 'Description': 'Comprehensive data access policy for Bedrock Knowledge Base and current user'} ]), type='data' ) + logger.info(f"✓ Successfully created data access policy: {access_policy_name}") + print(f"✓ Successfully created data access policy: {access_policy_name}") except self.aoss_client.exceptions.ConflictException: - print(f"{access_policy_name} already exists, retrieving it!") - access_policy = self.aoss_client.get_access_policy( - name=access_policy_name, - type='data' - ) + logger.info(f"Data access policy {access_policy_name} already exists, retrieving it") + print(f"Data access policy {access_policy_name} already exists, retrieving it") + try: + access_policy = self.aoss_client.get_access_policy( + name=access_policy_name, + type='data' + ) + logger.info(f"✓ Retrieved existing data access policy: {access_policy_name}") + print(f"✓ Retrieved existing data access policy: {access_policy_name}") + except ClientError as e: + logger.error(f"✗ Failed to retrieve data access policy {access_policy_name}: {e}") + raise Exception(f"Failed to retrieve data access policy: {e}") + except ClientError as e: + logger.error(f"✗ Failed to create data access policy {access_policy_name}: {e}") + print(f"✗ Error creating data access policy: {e}") + raise Exception(f"Failed to create data access policy: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error creating data access policy {access_policy_name}: {e}") + print(f"✗ Error creating data access policy: {e}") + raise + + logger.info("✓ All OpenSearch Serverless policies created/retrieved successfully") + print("✓ All OpenSearch Serverless policies created/retrieved successfully") return encryption_policy, network_policy, access_policy def create_oss(self, vector_store_name: str, oss_policy_name: str, bedrock_kb_execution_role: str): """ - Create OpenSearch Serverless Collection. If already existent, retrieve + Create comprehensive OpenSearch Serverless Collection with proper VECTORSEARCH type configuration. + If already existent, retrieve with enhanced error handling. Args: vector_store_name: name of the vector store oss_policy_name: name of the opensearch serverless access policy bedrock_kb_execution_role: name of the knowledge base execution role """ + logger.info(f"Creating OpenSearch Serverless collection: {vector_store_name}") + print(f"Creating OpenSearch Serverless collection: {vector_store_name}") + try: + logger.info(f"Creating collection with VECTORSEARCH type: {vector_store_name}") + print(f"Creating collection with VECTORSEARCH type: {vector_store_name}") collection = self.aoss_client.create_collection( - name=vector_store_name, type='VECTORSEARCH' + name=vector_store_name, + type='VECTORSEARCH', + description=f'Vector search collection for knowledge base {vector_store_name}' ) collection_id = collection['createCollectionDetail']['id'] collection_arn = collection['createCollectionDetail']['arn'] + logger.info(f"✓ Collection creation initiated - ID: {collection_id}, ARN: {collection_arn}") + print(f"✓ Collection creation initiated - ID: {collection_id}") + print(f"✓ Collection ARN: {collection_arn}") except self.aoss_client.exceptions.ConflictException: - collection = self.aoss_client.batch_get_collection( - names=[vector_store_name] - )['collectionDetails'][0] - pp.pprint(collection) - collection_id = collection['id'] - collection_arn = collection['arn'] - pp.pprint(collection) + logger.info(f"Collection {vector_store_name} already exists, retrieving it") + print(f"Collection {vector_store_name} already exists, retrieving it") + try: + collection = self.aoss_client.batch_get_collection( + names=[vector_store_name] + )['collectionDetails'][0] + collection_id = collection['id'] + collection_arn = collection['arn'] + logger.info(f"✓ Retrieved existing collection - ID: {collection_id}, ARN: {collection_arn}") + print(f"✓ Retrieved existing collection - ID: {collection_id}") + print(f"✓ Collection ARN: {collection_arn}") + except ClientError as e: + logger.error(f"✗ Failed to retrieve existing collection {vector_store_name}: {e}") + raise Exception(f"Failed to retrieve existing collection: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error retrieving collection {vector_store_name}: {e}") + raise + except ClientError as e: + logger.error(f"✗ Failed to create OpenSearch Serverless collection {vector_store_name}: {e}") + print(f"✗ Error creating OpenSearch Serverless collection: {e}") + raise Exception(f"Failed to create OpenSearch Serverless collection: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error creating OpenSearch Serverless collection {vector_store_name}: {e}") + print(f"✗ Error creating OpenSearch Serverless collection: {e}") + raise # Get the OpenSearch serverless collection URL host = collection_id + '.' + self.region_name + '.aoss.amazonaws.com' - print(host) - # wait for collection creation - # This can take couple of minutes to finish - response = self.aoss_client.batch_get_collection(names=[vector_store_name]) - # Periodically check collection status - while (response['collectionDetails'][0]['status']) == 'CREATING': - print('Creating collection...') - interactive_sleep(30) + logger.info(f"Collection endpoint: {host}") + print(f"Collection endpoint: {host}") + + # Wait for collection creation with enhanced status monitoring + logger.info("Waiting for collection to become active...") + print("Waiting for collection to become active...") + + try: response = self.aoss_client.batch_get_collection(names=[vector_store_name]) - print('\nCollection successfully created:') - pp.pprint(response["collectionDetails"]) - # create opensearch serverless access policy and attach it to Bedrock execution role + + # Periodically check collection status with better logging + status_check_count = 0 + max_status_checks = 20 # Maximum 10 minutes (20 * 30 seconds) + + while (response['collectionDetails'][0]['status']) == 'CREATING': + status_check_count += 1 + logger.info(f'Collection status: CREATING (check #{status_check_count}/{max_status_checks})') + print(f'Collection status: CREATING (check #{status_check_count})') + + if status_check_count >= max_status_checks: + logger.error(f"✗ Collection creation timeout after {max_status_checks} checks") + raise Exception(f"Collection creation timeout - status still CREATING after {max_status_checks * 30} seconds") + + interactive_sleep(30) + response = self.aoss_client.batch_get_collection(names=[vector_store_name]) + + final_status = response['collectionDetails'][0]['status'] + if final_status == 'ACTIVE': + logger.info(f'✓ Collection successfully created and is now ACTIVE') + print(f'✓ Collection successfully created and is now ACTIVE') + elif final_status == 'FAILED': + logger.error(f'✗ Collection creation failed with status: {final_status}') + raise Exception(f"Collection creation failed with status: {final_status}") + else: + logger.warning(f'⚠ Collection status: {final_status}') + print(f'⚠ Collection status: {final_status}') + + logger.info("Collection details:") + print("Collection details:") + pp.pprint(response["collectionDetails"]) + + except ClientError as e: + logger.error(f"✗ Error checking collection status: {e}") + raise Exception(f"Error checking collection status: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error during collection status monitoring: {e}") + raise + + # Create opensearch serverless access policy and attach it to Bedrock execution role try: + logger.info("Creating and attaching OpenSearch Serverless access policy to execution role...") + print("Creating and attaching OpenSearch Serverless access policy to execution role...") created = self.create_oss_policy_attach_bedrock_execution_role( collection_id, oss_policy_name, bedrock_kb_execution_role ) if created: # It can take up to a minute for data access rules to be enforced - print("Sleeping for a minute to ensure data access rules have been enforced") + logger.info("Waiting for data access rules to be enforced (60 seconds)...") + print("Waiting for data access rules to be enforced...") interactive_sleep(60) + logger.info("✓ OpenSearch Serverless access policy configured successfully") + print("✓ OpenSearch Serverless access policy configured successfully") return host, collection, collection_id, collection_arn except Exception as e: - print("Policy already exists") - pp.pprint(e) + logger.warning(f"⚠ Warning: Issue with policy attachment: {e}") + print(f"⚠ Warning: Issue with policy attachment: {e}") + print("Continuing with collection creation - policy may already exist") + # Still return the values even if there was an error attaching the policy + return host, collection, collection_id, collection_arn - def create_vector_index(self, index_name: str): + def create_vector_index(self, index_name: str, embedding_model: str = "amazon.titan-embed-text-v2:0"): """ - Create OpenSearch Serverless vector index. If existent, ignore + Create comprehensive OpenSearch Serverless vector index with correct dimensions + based on embedding model and proper field mapping for Bedrock Knowledge Base. Args: index_name: name of the vector index + embedding_model: embedding model to determine vector dimensions """ + # Validate embedding model and determine vector field dimensions + print(f"Configuring vector index for embedding model: {embedding_model}") + + if embedding_model not in valid_embedding_models: + raise ValueError(f"Invalid embedding model: {embedding_model}. Must be one of: {valid_embedding_models}") + + # Determine vector field dimensions based on embedding model + if "cohere" in embedding_model: + vector_dimension = 1536 + print(f"✓ Using Cohere embedding model - vector dimension: {vector_dimension}") + else: # Titan models + vector_dimension = 1024 + print(f"✓ Using Titan embedding model - vector dimension: {vector_dimension}") + + print(f"Creating vector index '{index_name}' with dimension {vector_dimension}") + + # Wait for permissions to be fully effective + print("Waiting for OpenSearch permissions to propagate...") + interactive_sleep(60) + + # Create index configuration compatible with Bedrock Knowledge Base (requires FAISS) body_json = { "settings": { - "index.knn": "true", + "index.knn": True, "number_of_shards": 1, - "knn.algo_param.ef_search": 512, - "number_of_replicas": 0, + "number_of_replicas": 0 }, "mappings": { "properties": { "vector": { "type": "knn_vector", - "dimension": 1024, + "dimension": vector_dimension, "method": { "name": "hnsw", "engine": "faiss", - "space_type": "l2" - }, + "space_type": "l2", + "parameters": { + "ef_construction": 128, + "m": 24 + } + } }, "text": { "type": "text" }, "text-metadata": { - "type": "text"} + "type": "text" + } } } } - # Create index + # Create the index with comprehensive error handling try: + print(f'Creating vector index: {index_name}') + print(f'Index configuration:') + print(f' - Vector dimension: {vector_dimension}') + print(f' - Engine: faiss with HNSW algorithm (required by Bedrock)') + print(f' - Space type: l2') + print(f' - Number of shards: 1') + print(f' - Number of replicas: 0') + print(f' - Field mapping: vector, text, text-metadata') + print(f' - Bedrock Knowledge Base compatible configuration') + + # Check if index already exists and verify it has the correct engine type + try: + existing_index = self.oss_client.indices.get(index=index_name) + print(f"Index {index_name} already exists - checking engine type...") + + # Check if the existing index has the correct engine type (FAISS) + index_mapping = existing_index[index_name]['mappings']['properties'] + if 'vector' in index_mapping: + vector_config = index_mapping['vector'] + if 'method' in vector_config: + engine_type = vector_config['method'].get('engine', 'unknown') + print(f"Existing index engine type: {engine_type}") + + if engine_type != 'faiss': + print(f"⚠ Index has wrong engine type ({engine_type}). Bedrock requires FAISS.") + print(f"Deleting existing index to recreate with correct configuration...") + + # Delete the existing index + self.oss_client.indices.delete(index=index_name) + print(f"✓ Deleted existing index {index_name}") + + # Wait a bit for deletion to propagate + interactive_sleep(10) + else: + print(f"✓ Index {index_name} already has correct FAISS engine - skipping creation") + return True + else: + print("⚠ Could not determine engine type, recreating index...") + self.oss_client.indices.delete(index=index_name) + interactive_sleep(10) + else: + print("⚠ Index missing vector field, recreating...") + self.oss_client.indices.delete(index=index_name) + interactive_sleep(10) + + except Exception as e: + # Index doesn't exist or error checking it, proceed with creation + print(f"Index doesn't exist or error checking it: {str(e)}") + pass + response = self.oss_client.indices.create(index=index_name, body=json.dumps(body_json)) - print('\nCreating index:') + print('✓ Vector index creation successful') pp.pprint(response) - - # index creation can take up to a minute - interactive_sleep(60) + + # Wait for index to be fully ready + print("Waiting for index to be fully ready...") + interactive_sleep(30) + + # Verify index was created successfully + try: + index_info = self.oss_client.indices.get(index=index_name) + print("✓ Index verification successful") + print(f"✓ Index '{index_name}' is ready for use") + return True + except Exception as verify_error: + print(f"⚠ Warning: Could not verify index creation: {verify_error}") + print("Continuing anyway - index may still be functional") + return True + except RequestError as e: - # you can delete the index if its already exists - # oss_client.indices.delete(index=index_name) - print( - f'Error while trying to create the index, with error {e.error}\nyou may unmark the delete above to ' - f'delete, and recreate the index') + # If index already exists, this is actually OK - continue + if "resource_already_exists_exception" in str(e): + print(f"✓ Index {index_name} already exists - verifying configuration") + try: + # Verify existing index has correct configuration + index_info = self.oss_client.indices.get(index=index_name) + print("✓ Existing index verified successfully") + return True + except Exception as verify_error: + print(f"⚠ Warning: Could not verify existing index: {verify_error}") + print("Continuing anyway - existing index may still be functional") + return True + else: + print(f'✗ Error creating vector index: {e.error}') + raise Exception(f"Failed to create vector index: {e.error}") + except Exception as e: + print(f'✗ Unexpected error during index creation: {str(e)}') + raise Exception(f"Failed to create vector index: {str(e)}") @retry(wait_random_min=1000, wait_random_max=2000, stop_max_attempt_number=7) def create_knowledge_base( @@ -600,6 +1059,127 @@ def create_knowledge_base( kb_description: knowledge base description bedrock_kb_execution_role: knowledge base execution role + Returns: + knowledge base object, + data source object + """ + # First, attempt to ensure the index exists with proper synchronization + try: + # Extract collection ID from collection ARN instead of index name + collection_id = collection_arn.split('/')[-1] + host = collection_id + '.' + self.region_name + '.aoss.amazonaws.com' + + # Use a direct approach with requests + import requests + from requests_aws4auth import AWS4Auth + + # Get fresh credentials + boto3_session = boto3.session.Session(region_name="us-west-2") + credentials = boto3_session.get_credentials() + awsauth = AWS4Auth( + credentials.access_key, + credentials.secret_key, + self.region_name, + 'aoss', + session_token=credentials.token + ) + + # First check if index exists + url = f"https://{host}/{index_name}" + print(f"Checking if index exists: {url}") + + # Try multiple times to check index existence due to eventual consistency + index_exists = False + for attempt in range(3): + response = requests.head(url, auth=awsauth, timeout=30) + if response.status_code == 200: + index_exists = True + print(f"Index {index_name} already exists") + break + elif response.status_code == 404: + print(f"Index {index_name} not found (attempt {attempt + 1}/3)") + if attempt < 2: + time.sleep(10) # Wait before retry + else: + print(f"Unexpected response checking index: {response.status_code}") + if attempt < 2: + time.sleep(10) + + if not index_exists: + print(f"Creating index {index_name}...") + + # Create the index with Bedrock Knowledge Base compatible configuration (FAISS required) + body_json = { + "settings": { + "index.knn": True, + "number_of_shards": 1, + "number_of_replicas": 0 + }, + "mappings": { + "properties": { + "vector": { + "type": "knn_vector", + "dimension": 1024, + "method": { + "name": "hnsw", + "engine": "faiss", + "space_type": "l2", + "parameters": { + "ef_construction": 128, + "m": 24 + } + } + }, + "text": { + "type": "text" + }, + "text-metadata": { + "type": "text" + } + } + } + } + + # Try direct request to create index + create_response = requests.put( + url, + auth=awsauth, + json=body_json, + headers={"Content-Type": "application/json"}, + timeout=60 + ) + + if create_response.status_code >= 200 and create_response.status_code < 300: + print(f"Successfully created index {index_name}") + # Wait for index to be fully ready + print("Waiting for index to be fully ready...") + time.sleep(30) + + # Verify index was created + verify_response = requests.head(url, auth=awsauth, timeout=30) + if verify_response.status_code == 200: + print(f"✓ Index {index_name} verified and ready") + else: + print(f"⚠ Warning: Index creation may not be complete (status: {verify_response.status_code})") + else: + print(f"Failed to create index: {create_response.status_code}") + print(f"Response: {create_response.text}") + raise Exception(f"Failed to create index {index_name}: {create_response.text}") + + except Exception as e: + print(f"Error while trying to verify/create index: {str(e)}") + raise Exception(f"Index creation/verification failed: {str(e)}") + """ + Create Knowledge Base and its Data Source. If existent, retrieve + Args: + collection_arn: ARN of the opensearch serverless collection + index_name: name of the opensearch serverless index + bucket_name: name of the s3 bucket containing the knowledge base data + embedding_model: id of the embedding model used + kb_name: knowledge base name + kb_description: knowledge base description + bedrock_kb_execution_role: knowledge base execution role + Returns: knowledge base object, data source object @@ -637,7 +1217,11 @@ def create_knowledge_base( "embeddingModelArn": embedding_model_arn } })) + logger.info(f"Creating knowledge base: {kb_name}") + logger.info(f"Configuration - Embedding model: {embedding_model}, Collection ARN: {collection_arn}") + try: + logger.info("Attempting to create knowledge base...") create_kb_response = self.bedrock_agent_client.create_knowledge_base( name=kb_name, description=kb_description, @@ -654,21 +1238,52 @@ def create_knowledge_base( } ) kb = create_kb_response["knowledgeBase"] + logger.info(f"✓ Successfully created knowledge base with ID: {kb['knowledgeBaseId']}") pp.pprint(kb) except self.bedrock_agent_client.exceptions.ConflictException: - kbs = self.bedrock_agent_client.list_knowledge_bases( - maxResults=100 - ) - kb_id = None - for kb in kbs['knowledgeBaseSummaries']: - if kb['name'] == kb_name: - kb_id = kb['knowledgeBaseId'] - response = self.bedrock_agent_client.get_knowledge_base(knowledgeBaseId=kb_id) - kb = response['knowledgeBase'] - pp.pprint(kb) + logger.info(f"Knowledge base {kb_name} already exists, retrieving it") + try: + kbs = self.bedrock_agent_client.list_knowledge_bases(maxResults=100) + kb_id = None + for existing_kb in kbs['knowledgeBaseSummaries']: + if existing_kb['name'] == kb_name: + kb_id = existing_kb['knowledgeBaseId'] + break + + if kb_id: + response = self.bedrock_agent_client.get_knowledge_base(knowledgeBaseId=kb_id) + kb = response['knowledgeBase'] + logger.info(f"✓ Retrieved existing knowledge base with ID: {kb_id}") + pp.pprint(kb) + else: + logger.error(f"✗ Could not find existing knowledge base with name {kb_name}") + raise Exception(f"Could not find existing knowledge base with name {kb_name}") + except ClientError as e: + logger.error(f"✗ Failed to retrieve existing knowledge base {kb_name}: {e}") + raise Exception(f"Failed to retrieve existing knowledge base: {e}") + except self.bedrock_agent_client.exceptions.ValidationException as e: + logger.error(f"✗ Validation error creating knowledge base {kb_name}: {e}") + error_message = str(e) + if "storageConfiguration.type" in error_message: + logger.error("This appears to be a storage configuration validation error") + logger.error("Supported storage types: [RDS, OPENSEARCH_SERVERLESS, PINECONE, MONGO_DB_ATLAS, NEPTUNE_ANALYTICS, REDIS_ENTERPRISE_CLOUD]") + raise Exception(f"Knowledge base validation error: {e}") + except ClientError as e: + logger.error(f"✗ AWS client error creating knowledge base {kb_name}: {e}") + error_code = e.response.get('Error', {}).get('Code', 'Unknown') + if error_code == 'AccessDeniedException': + logger.error("Access denied - check IAM permissions for Bedrock and OpenSearch Serverless") + elif error_code == 'ResourceNotFoundException': + logger.error("Resource not found - check if collection ARN and index exist") + raise Exception(f"AWS client error creating knowledge base: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error creating knowledge base {kb_name}: {e}") + raise # Create a DataSource in KnowledgeBase + logger.info(f"Creating data source for knowledge base: {kb['knowledgeBaseId']}") try: + logger.info("Attempting to create data source...") create_ds_response = self.bedrock_agent_client.create_data_source( name=kb_name, description=kb_description, @@ -683,18 +1298,45 @@ def create_knowledge_base( } ) ds = create_ds_response["dataSource"] + logger.info(f"✓ Successfully created data source with ID: {ds['dataSourceId']}") pp.pprint(ds) except self.bedrock_agent_client.exceptions.ConflictException: - ds_id = self.bedrock_agent_client.list_data_sources( - knowledgeBaseId=kb['knowledgeBaseId'], - maxResults=100 - )['dataSourceSummaries'][0]['dataSourceId'] - get_ds_response = self.bedrock_agent_client.get_data_source( - dataSourceId=ds_id, - knowledgeBaseId=kb['knowledgeBaseId'] - ) - ds = get_ds_response["dataSource"] - pp.pprint(ds) + logger.info(f"Data source for knowledge base {kb['knowledgeBaseId']} already exists, retrieving it") + try: + ds_list = self.bedrock_agent_client.list_data_sources( + knowledgeBaseId=kb['knowledgeBaseId'], + maxResults=100 + ) + + if ds_list['dataSourceSummaries']: + ds_id = ds_list['dataSourceSummaries'][0]['dataSourceId'] + get_ds_response = self.bedrock_agent_client.get_data_source( + dataSourceId=ds_id, + knowledgeBaseId=kb['knowledgeBaseId'] + ) + ds = get_ds_response["dataSource"] + logger.info(f"✓ Retrieved existing data source with ID: {ds_id}") + pp.pprint(ds) + else: + logger.error(f"✗ No data sources found for knowledge base {kb['knowledgeBaseId']}") + raise Exception(f"No data sources found for knowledge base {kb['knowledgeBaseId']}") + except ClientError as e: + logger.error(f"✗ Failed to retrieve existing data source: {e}") + raise Exception(f"Failed to retrieve existing data source: {e}") + except self.bedrock_agent_client.exceptions.ValidationException as e: + logger.error(f"✗ Validation error creating data source: {e}") + raise Exception(f"Data source validation error: {e}") + except ClientError as e: + logger.error(f"✗ AWS client error creating data source: {e}") + error_code = e.response.get('Error', {}).get('Code', 'Unknown') + if error_code == 'AccessDeniedException': + logger.error("Access denied - check IAM permissions for S3 bucket access") + elif error_code == 'ResourceNotFoundException': + logger.error("Resource not found - check if knowledge base exists and S3 bucket is accessible") + raise Exception(f"AWS client error creating data source: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error creating data source: {e}") + raise return kb, ds def synchronize_data(self, kb_id, ds_id): @@ -705,26 +1347,111 @@ def synchronize_data(self, kb_id, ds_id): kb_id: knowledge base id ds_id: data source id """ - # ensure that the kb is available + logger.info(f"Starting data synchronization for knowledge base: {kb_id}, data source: {ds_id}") + + # Ensure that the knowledge base is available + logger.info("Waiting for knowledge base to be available...") i_status = ['CREATING', 'DELETING', 'UPDATING'] - while self.bedrock_agent_client.get_knowledge_base(knowledgeBaseId=kb_id)['knowledgeBase']['status'] in i_status: - time.sleep(10) + status_check_count = 0 + max_status_checks = 60 # Maximum 10 minutes (60 * 10 seconds) + + try: + while True: + kb_response = self.bedrock_agent_client.get_knowledge_base(knowledgeBaseId=kb_id) + kb_status = kb_response['knowledgeBase']['status'] + + if kb_status not in i_status: + if kb_status == 'AVAILABLE': + logger.info(f"✓ Knowledge base is now available (status: {kb_status})") + break + elif kb_status == 'FAILED': + logger.error(f"✗ Knowledge base is in FAILED state") + raise Exception(f"Knowledge base is in FAILED state") + else: + logger.info(f"✓ Knowledge base status: {kb_status}") + break + + status_check_count += 1 + if status_check_count >= max_status_checks: + logger.error(f"✗ Timeout waiting for knowledge base to be available after {max_status_checks * 10} seconds") + raise Exception(f"Timeout waiting for knowledge base to be available") + + logger.info(f"Knowledge base status: {kb_status} (check #{status_check_count}/{max_status_checks})") + time.sleep(10) + + except ClientError as e: + logger.error(f"✗ Error checking knowledge base status: {e}") + raise Exception(f"Error checking knowledge base status: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error waiting for knowledge base: {e}") + raise + # Start an ingestion job - start_job_response = self.bedrock_agent_client.start_ingestion_job( - knowledgeBaseId=kb_id, - dataSourceId=ds_id - ) - job = start_job_response["ingestionJob"] - pp.pprint(job) - # Get job - while job['status'] != 'COMPLETE': - get_job_response = self.bedrock_agent_client.get_ingestion_job( + try: + logger.info("Starting ingestion job...") + start_job_response = self.bedrock_agent_client.start_ingestion_job( knowledgeBaseId=kb_id, - dataSourceId=ds_id, - ingestionJobId=job["ingestionJobId"] + dataSourceId=ds_id ) - job = get_job_response["ingestionJob"] - pp.pprint(job) + job = start_job_response["ingestionJob"] + job_id = job["ingestionJobId"] + logger.info(f"✓ Ingestion job started with ID: {job_id}") + pp.pprint(job) + except ClientError as e: + logger.error(f"✗ Failed to start ingestion job: {e}") + error_code = e.response.get('Error', {}).get('Code', 'Unknown') + if error_code == 'ConflictException': + logger.error("Another ingestion job may already be running") + elif error_code == 'ResourceNotFoundException': + logger.error("Knowledge base or data source not found") + raise Exception(f"Failed to start ingestion job: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error starting ingestion job: {e}") + raise + + # Monitor job progress + logger.info("Monitoring ingestion job progress...") + job_check_count = 0 + max_job_checks = 120 # Maximum 20 minutes (120 * 10 seconds) + + try: + while job['status'] not in ['COMPLETE', 'FAILED']: + job_check_count += 1 + if job_check_count >= max_job_checks: + logger.error(f"✗ Ingestion job timeout after {max_job_checks * 10} seconds") + raise Exception(f"Ingestion job timeout") + + logger.info(f"Ingestion job status: {job['status']} (check #{job_check_count}/{max_job_checks})") + time.sleep(10) + + get_job_response = self.bedrock_agent_client.get_ingestion_job( + knowledgeBaseId=kb_id, + dataSourceId=ds_id, + ingestionJobId=job["ingestionJobId"] + ) + job = get_job_response["ingestionJob"] + + if job['status'] == 'COMPLETE': + logger.info(f"✓ Ingestion job completed successfully") + if 'statistics' in job: + stats = job['statistics'] + logger.info(f"Ingestion statistics: {stats}") + elif job['status'] == 'FAILED': + logger.error(f"✗ Ingestion job failed") + if 'failureReasons' in job: + logger.error(f"Failure reasons: {job['failureReasons']}") + raise Exception(f"Ingestion job failed: {job.get('failureReasons', 'Unknown reason')}") + + pp.pprint(job) + + except ClientError as e: + logger.error(f"✗ Error monitoring ingestion job: {e}") + raise Exception(f"Error monitoring ingestion job: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error during ingestion job monitoring: {e}") + raise + + logger.info("Waiting for final synchronization to complete...") interactive_sleep(40) def delete_kb(self, kb_name: str, delete_s3_bucket: bool = True, delete_iam_roles_and_policies: bool = True, @@ -880,6 +1607,340 @@ def delete_iam_roles_and_policies(self, kb_execution_role_name: str): self.iam_client.delete_role(RoleName=kb_execution_role_name) return 0 + def create_knowledge_base_with_native_vector_store( + self, kb_name: str, kb_description: str, data_bucket_name: str, embedding_model: str + ): + """ + Create a knowledge base using OpenSearch Serverless configuration. + This method creates all required OpenSearch Serverless infrastructure components. + + Args: + kb_name: Knowledge Base Name + kb_description: Knowledge Base Description + data_bucket_name: Name of s3 Bucket containing Knowledge Base Data + embedding_model: Name of Embedding model to be used + + Returns: + kb_id: str - Knowledge base id + ds_id: str - Data Source id + """ + print("========================================================================================") + print(f"Step 1 - Creating or retrieving {data_bucket_name} S3 bucket for Knowledge Base documents") + if data_bucket_name is None: + kb_name_temp = kb_name.replace("_", "-") + data_bucket_name = f"{kb_name_temp}-{self.suffix}" + print(f"KB bucket name not provided, creating a new one called: {data_bucket_name}") + + self.create_s3_bucket(data_bucket_name) + + print("========================================================================================") + print(f"Step 2 - Creating Knowledge Base Execution Role and Policies") + + kb_execution_role_name = f'AmazonBedrockExecutionRoleForKnowledgeBase_{self.suffix}' + fm_policy_name = f'AmazonBedrockFoundationModelPolicyForKnowledgeBase_{self.suffix}' + s3_policy_name = f'AmazonBedrockS3PolicyForKnowledgeBase_{self.suffix}' + oss_policy_name = f'AmazonBedrockOSSPolicyForKnowledgeBase_{self.suffix}' + + bedrock_kb_execution_role = self.create_bedrock_kb_execution_role( + embedding_model, data_bucket_name, fm_policy_name, s3_policy_name, kb_execution_role_name + ) + + print("========================================================================================") + print(f"Step 3 - Creating comprehensive OpenSearch Serverless policies") + + # Create policy names for OpenSearch Serverless + encryption_policy_name = f"{kb_name}-sp-{self.suffix}" + network_policy_name = f"{kb_name}-np-{self.suffix}" + access_policy_name = f'{kb_name}-ap-{self.suffix}' + vector_store_name = f'{kb_name}-{self.suffix}' + index_name = f"bedrock-knowledge-base-default-index" + + print(f"Policy configuration:") + print(f" - Encryption policy: {encryption_policy_name}") + print(f" - Network policy: {network_policy_name}") + print(f" - Access policy: {access_policy_name}") + print(f" - Vector store name: {vector_store_name}") + + # Create comprehensive OpenSearch Serverless policies + encryption_policy, network_policy, access_policy = self.create_policies_in_oss( + encryption_policy_name, vector_store_name, network_policy_name, + bedrock_kb_execution_role, access_policy_name + ) + + print("========================================================================================") + print(f"Step 4 - Creating OpenSearch Serverless Collection with VECTORSEARCH type") + + print(f"Collection configuration:") + print(f" - Collection name: {vector_store_name}") + print(f" - Collection type: VECTORSEARCH") + print(f" - Region: {self.region_name}") + + # Create comprehensive OpenSearch Serverless collection + host, collection, collection_id, collection_arn = self.create_oss( + vector_store_name, oss_policy_name, bedrock_kb_execution_role + ) + + # Wait for collection and permissions to propagate + print("Waiting for OpenSearch permissions to propagate...") + interactive_sleep(120) + + # Build the OpenSearch client with fresh credentials + credentials = boto3.session.Session(region_name="us-west-2").get_credentials() + self.awsauth = AWSV4SignerAuth(credentials, self.region_name, 'aoss') + self.oss_client = OpenSearch( + hosts=[{'host': host, 'port': 443}], + http_auth=self.awsauth, + use_ssl=True, + verify_certs=True, + connection_class=RequestsHttpConnection, + timeout=300 + ) + + print("========================================================================================") + print(f"Step 5 - Creating OpenSearch Serverless Vector Index with proper engine settings") + + print(f"Vector index configuration:") + print(f" - Index name: {index_name}") + print(f" - Embedding model: {embedding_model}") + print(f" - Collection endpoint: {host}") + + # Create comprehensive vector index with proper dimensions and engine settings + index_created = self.create_vector_index(index_name, embedding_model) + + if not index_created: + print("⚠ WARNING: Vector index creation failed - this may cause issues") + print("Attempting to continue with knowledge base creation...") + else: + print("✓ Vector index created successfully") + + print("========================================================================================") + print(f"Step 6 - Creating Knowledge Base with OpenSearch Serverless") + + # The embedding model used by Bedrock to embed ingested documents, and realtime prompts + embedding_model_arn = f"arn:aws:bedrock:{self.region_name}::foundation-model/{embedding_model}" + + # Determine vector field dimensions based on embedding model + if "cohere" in embedding_model: + vector_field_dimension = 1536 + else: # Titan models + vector_field_dimension = 1024 + + # OpenSearch Serverless configuration + opensearch_serverless_configuration = { + "collectionArn": collection_arn, + "vectorIndexName": index_name, + "fieldMapping": { + "vectorField": "bedrock-knowledge-base-default-vector", + "textField": "AMAZON_BEDROCK_TEXT_CHUNK", + "metadataField": "AMAZON_BEDROCK_METADATA" + } + } + + # Ingest strategy - How to ingest data from the data source + chunking_strategy_configuration = { + "chunkingStrategy": "FIXED_SIZE", + "fixedSizeChunkingConfiguration": { + "maxTokens": 150, + "overlapPercentage": 20 + } + } + + # The data source to ingest documents from + s3_configuration = { + "bucketArn": f"arn:aws:s3:::{data_bucket_name}", + } + + logger.info(f"Creating knowledge base with OpenSearch Serverless configuration...") + logger.info(f"Knowledge base name: {kb_name}") + logger.info(f"Embedding model: {embedding_model}") + logger.info(f"Collection ARN: {collection_arn}") + logger.info(f"Index name: {index_name}") + + try: + logger.info("Attempting to create knowledge base with OpenSearch Serverless...") + create_kb_response = self.bedrock_agent_client.create_knowledge_base( + name=kb_name, + description=kb_description if kb_description else f"Knowledge base for {kb_name}", + roleArn=bedrock_kb_execution_role['Role']['Arn'], + knowledgeBaseConfiguration={ + "type": "VECTOR", + "vectorKnowledgeBaseConfiguration": { + "embeddingModelArn": embedding_model_arn + } + }, + storageConfiguration={ + "type": "OPENSEARCH_SERVERLESS", + "opensearchServerlessConfiguration": opensearch_serverless_configuration + } + ) + kb = create_kb_response["knowledgeBase"] + logger.info(f"✓ Successfully created knowledge base with ID: {kb['knowledgeBaseId']}") + print(f"Created knowledge base with ID: {kb['knowledgeBaseId']}") + except self.bedrock_agent_client.exceptions.ConflictException: + logger.info(f"Knowledge base {kb_name} already exists, retrieving it") + print(f"Knowledge base {kb_name} already exists, retrieving it") + try: + kbs = self.bedrock_agent_client.list_knowledge_bases(maxResults=100) + kb_id = None + for existing_kb in kbs['knowledgeBaseSummaries']: + if existing_kb['name'] == kb_name: + kb_id = existing_kb['knowledgeBaseId'] + break + + if kb_id: + response = self.bedrock_agent_client.get_knowledge_base(knowledgeBaseId=kb_id) + kb = response['knowledgeBase'] + logger.info(f"✓ Retrieved existing knowledge base with ID: {kb['knowledgeBaseId']}") + print(f"Retrieved existing knowledge base with ID: {kb['knowledgeBaseId']}") + else: + logger.error(f"✗ Could not find existing knowledge base with name {kb_name}") + raise Exception(f"Could not find existing knowledge base with name {kb_name}") + except ClientError as e: + logger.error(f"✗ Failed to retrieve existing knowledge base: {e}") + raise Exception(f"Failed to retrieve existing knowledge base: {e}") + except self.bedrock_agent_client.exceptions.ValidationException as e: + logger.error(f"✗ Validation error creating knowledge base: {e}") + error_message = str(e) + if "storageConfiguration.type" in error_message: + logger.error("Storage configuration validation error detected") + logger.error("Supported storage types: [RDS, OPENSEARCH_SERVERLESS, PINECONE, MONGO_DB_ATLAS, NEPTUNE_ANALYTICS, REDIS_ENTERPRISE_CLOUD]") + logger.error("Current configuration uses: OPENSEARCH_SERVERLESS") + elif "opensearchServerlessConfiguration" in error_message: + logger.error("OpenSearch Serverless configuration error detected") + logger.error(f"Collection ARN: {collection_arn}") + logger.error(f"Index name: {index_name}") + print(f"✗ Validation error: {e}") + raise Exception(f"Knowledge base validation error: {e}") + except ClientError as e: + logger.error(f"✗ AWS client error creating knowledge base: {e}") + error_code = e.response.get('Error', {}).get('Code', 'Unknown') + if error_code == 'AccessDeniedException': + logger.error("Access denied - check IAM permissions for Bedrock and OpenSearch Serverless") + logger.error("Required permissions: bedrock:CreateKnowledgeBase, aoss:APIAccessAll") + elif error_code == 'ResourceNotFoundException': + logger.error("Resource not found - check if collection ARN exists and is accessible") + logger.error(f"Collection ARN: {collection_arn}") + print(f"✗ AWS client error: {e}") + raise Exception(f"AWS client error creating knowledge base: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error creating knowledge base: {e}") + print(f"Error creating knowledge base: {e}") + raise + + # Wait for the knowledge base to be available + kb_id = kb['knowledgeBaseId'] + logger.info(f"Waiting for knowledge base {kb_id} to be available...") + print("Waiting for knowledge base to be available...") + + status_check_count = 0 + max_status_checks = 60 # Maximum 10 minutes (60 * 10 seconds) + + try: + while True: + response = self.bedrock_agent_client.get_knowledge_base(knowledgeBaseId=kb_id) + status = response["knowledgeBase"]["status"] + + status_check_count += 1 + logger.info(f"Knowledge base status: {status} (check #{status_check_count}/{max_status_checks})") + print(f"Knowledge base status: {status}") + + if status == "AVAILABLE": + logger.info(f"✓ Knowledge base is now available") + break + elif status == "FAILED": + logger.error(f"✗ Knowledge base creation failed") + raise Exception(f"Knowledge base creation failed") + elif status_check_count >= max_status_checks: + logger.error(f"✗ Timeout waiting for knowledge base to be available after {max_status_checks * 10} seconds") + raise Exception(f"Timeout waiting for knowledge base to be available") + + interactive_sleep(10) + + except ClientError as e: + logger.error(f"✗ Error checking knowledge base status: {e}") + raise Exception(f"Error checking knowledge base status: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error waiting for knowledge base: {e}") + raise + + print("========================================================================================") + print(f"Step 7 - Creating Data Source for Knowledge Base") + + # Create the data source + logger.info(f"Creating data source for knowledge base: {kb_id}") + logger.info(f"S3 bucket ARN: {s3_configuration['bucketArn']}") + + try: + logger.info("Attempting to create data source...") + create_ds_response = self.bedrock_agent_client.create_data_source( + name=f"{kb_name}-source", + description=f"Data source for {kb_name}", + knowledgeBaseId=kb_id, + dataDeletionPolicy="RETAIN", + dataSourceConfiguration={ + "type": "S3", + "s3Configuration": s3_configuration + }, + vectorIngestionConfiguration={ + "chunkingConfiguration": chunking_strategy_configuration + } + ) + ds = create_ds_response["dataSource"] + logger.info(f"✓ Successfully created data source with ID: {ds['dataSourceId']}") + print(f"Created data source with ID: {ds['dataSourceId']}") + except self.bedrock_agent_client.exceptions.ConflictException: + logger.info(f"Data source for knowledge base {kb_id} already exists, retrieving it") + print(f"Data source for {kb_name} already exists, retrieving it") + try: + ds_available = self.bedrock_agent_client.list_data_sources( + knowledgeBaseId=kb_id, + maxResults=100 + ) + + if ds_available['dataSourceSummaries']: + ds_id = ds_available['dataSourceSummaries'][0]['dataSourceId'] + get_ds_response = self.bedrock_agent_client.get_data_source( + dataSourceId=ds_id, + knowledgeBaseId=kb_id + ) + ds = get_ds_response["dataSource"] + logger.info(f"✓ Retrieved existing data source with ID: {ds['dataSourceId']}") + print(f"Retrieved existing data source with ID: {ds['dataSourceId']}") + else: + logger.error(f"✗ No data sources found for knowledge base {kb_id}") + raise Exception(f"Could not find existing data source for knowledge base {kb_name}") + except ClientError as e: + logger.error(f"✗ Failed to retrieve existing data source: {e}") + raise Exception(f"Failed to retrieve existing data source: {e}") + except self.bedrock_agent_client.exceptions.ValidationException as e: + logger.error(f"✗ Validation error creating data source: {e}") + error_message = str(e) + if "s3Configuration" in error_message: + logger.error("S3 configuration validation error detected") + logger.error(f"S3 bucket ARN: {s3_configuration['bucketArn']}") + print(f"✗ Validation error: {e}") + raise Exception(f"Data source validation error: {e}") + except ClientError as e: + logger.error(f"✗ AWS client error creating data source: {e}") + error_code = e.response.get('Error', {}).get('Code', 'Unknown') + if error_code == 'AccessDeniedException': + logger.error("Access denied - check IAM permissions for S3 bucket access") + logger.error(f"Required permissions for bucket: {data_bucket_name}") + elif error_code == 'ResourceNotFoundException': + logger.error("Resource not found - check if knowledge base exists and S3 bucket is accessible") + logger.error(f"Knowledge base ID: {kb_id}") + logger.error(f"S3 bucket: {data_bucket_name}") + print(f"✗ AWS client error: {e}") + raise Exception(f"AWS client error creating data source: {e}") + except Exception as e: + logger.error(f"✗ Unexpected error creating data source: {e}") + print(f"Error creating data source: {e}") + raise + + print("========================================================================================") + + return kb["knowledgeBaseId"], ds["dataSourceId"] + def delete_s3(self, bucket_name: str): """ Delete the objects contained in the Knowledge Base S3 bucket. @@ -892,4 +1953,4 @@ def delete_s3(self, bucket_name: str): if 'Contents' in objects: for obj in objects['Contents']: self.s3_client.delete_object(Bucket=bucket_name, Key=obj['Key']) - self.s3_client.delete_bucket(Bucket=bucket_name) \ No newline at end of file + self.s3_client.delete_bucket(Bucket=bucket_name) diff --git a/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/lambda_rewoo.py b/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/lambda_rewoo.py index 37807c4a7..d38ca2e06 100644 --- a/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/lambda_rewoo.py +++ b/agents-and-function-calling/bedrock-agents/features-examples/14-create-agent-with-custom-orchestration/lambda_rewoo.py @@ -5,6 +5,10 @@ import re import uuid import xml.etree.ElementTree as ET +import boto3 + +# Ensure all boto3 clients use us-west-2 region +boto3.setup_default_session(region_name="us-west-2") tool_state = { "plan": None,