Skip to content
This repository was archived by the owner on Sep 7, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .DS_Store
Binary file not shown.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
| 6 | GameLift MCP Server | 使用MCP协议来获取当前账户的GameLift相关信息 | seanguo@ yuzp@ | [gamelift-mcp-server](gamelift-mcp-server/README.md) |
| 7 | S3 Upload Server | 上传文件到S3并返回公共访问链接的MCP服务器 | hcihang@ | [s3-upload-server](s3_upload_server/README.md) |
| 8 | AOS MCP Server | 基于Serverless架构的Amazon OpenSearch MCP服务器,支持向量搜索和知识注入 | tangaws@ | [aos-mcp-serverless](aos-mcp-serverless/serverless-mcp-setup/README.md) |
| 9 | S3 Upload Server for AgentCore MCP runtime | S3文件上传MCPserver部署至AgentCore的方法 | runpeng@ | [s3-upload-server-for-agentcore](s3_upload_server_for_agentcore/README.md) |

## Demo MCP on Amazon Bedrock
推荐Bedrock MCP Demo:
Expand Down
9 changes: 9 additions & 0 deletions s3_upload_server_for_agentcore/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
You can use this to deploy the S3 upload MCP server to Amazon Bedrock AgentCore MCP runtime,
to use it as a remote MCP server

Procedures:
1. start an virtual python environment and pip install -r requirements.txt
2. cd to the folder s3_upload_for_agentcore
3. run python remote_deploy.py
4. Automatic deploy to your account (Please make sure you have sufficient permissions)
5. Clean up resources with python clean-up.py
75 changes: 75 additions & 0 deletions s3_upload_server_for_agentcore/clean-up.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from dotenv import load_dotenv
import os
import boto3

load_dotenv(dotenv_path=".cleanup_info")

agentID = os.getenv("agent_id")
repoName = os.getenv("repo_name")
roleName = os.getenv("role_name")
ssmName = os.getenv("ssm_name")
secretID = os.getenv("secret_id")

print("🗑️ Starting cleanup process...")

region='us-east-1'
agentcore_control_client = boto3.client('bedrock-agentcore-control', region_name=region)
ecr_client = boto3.client('ecr', region_name=region)
iam_client = boto3.client('iam', region_name=region)
ssm_client = boto3.client('ssm', region_name=region)
secrets_client = boto3.client('secretsmanager', region_name=region)

#response = cognito_client.delete_user_pool(UserPoolId=userpoolID)
#print("✅ User Pool deleted:", response)

try:
print("Deleting AgentCore Runtime...")
runtime_delete_response = agentcore_control_client.delete_agent_runtime(
agentRuntimeId=agentID,
)
print("✓ AgentCore Runtime deletion initiated")

print("Deleting ECR repository...")
ecr_client.delete_repository(
repositoryName=repoName,
force=True
)
print("✓ ECR repository deleted")

print("Deleting IAM role policies...")
policies = iam_client.list_role_policies(
RoleName=roleName,
MaxItems=100
)

for policy_name in policies['PolicyNames']:
iam_client.delete_role_policy(
RoleName=roleName,
PolicyName=policy_name
)

iam_client.delete_role(
RoleName=roleName
)
print("✓ IAM role deleted")

try:
ssm_client.delete_parameter(Name=ssmName)
print("✓ Parameter Store parameter deleted")
except ssm_client.exceptions.ParameterNotFound:
print("ℹ️ Parameter Store parameter not found")

try:
secrets_client.delete_secret(
SecretId=secretID,
ForceDeleteWithoutRecovery=True
)
print("✓ Secrets Manager secret deleted")
except secrets_client.exceptions.ResourceNotFoundException:
print("ℹ️ Secrets Manager secret not found")

print("\n✅ Cleanup completed successfully!")

except Exception as e:
print(f"❌ Error during cleanup: {e}")
print("You may need to manually clean up some resources.")
192 changes: 192 additions & 0 deletions s3_upload_server_for_agentcore/mcp_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
#!/usr/bin/env python3
import json
import mimetypes
import os
from typing import Optional, Dict, Any
from mcp.server.fastmcp import FastMCP, Context
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
import logging

# Constants
DEFAULT_AWS_REGION = 'us-east-1'

# Required AWS permissions:
# - sts:GetCallerIdentity
# - s3:CreateBucket
# - s3:ListBucket
# - s3:PutObject
# - s3:GetObject (for presigned URL generation)

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
expire_hours = int(os.environ.get('EXPIRE_HOURS',144))

mcp = FastMCP(host="0.0.0.0", stateless_http=True)

def get_aws_credentials() -> Dict[str, str]:
"""Get AWS credentials from environment variables"""
access_key = os.environ.get('AWS_ACCESS_KEY_ID')
secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
session_token = os.environ.get('AWS_SESSION_TOKEN') # Optional
if not access_key or not secret_key:
return None

credentials = {
'aws_access_key_id': access_key,
'aws_secret_access_key': secret_key
}

if session_token:
credentials['aws_session_token'] = session_token

return credentials

def get_aws_region() -> str:
"""Get AWS region from environment variable or use default"""
return os.environ.get('AWS_DEFAULT_REGION', os.environ.get('AWS_REGION', DEFAULT_AWS_REGION))

def get_account_id(region: str = None) -> str:
"""Get current AWS account ID using STS"""
try:
credentials = get_aws_credentials()
if credentials:
if region:
sts_client = boto3.client('sts', region_name=region, **credentials)
else:
sts_client = boto3.client('sts', **credentials)
else:
if region:
sts_client = boto3.client('sts', region_name=region)
else:
sts_client = boto3.client('sts')
response = sts_client.get_caller_identity()
return response['Account']
except NoCredentialsError as e:
raise ValueError(f"AWS credentials not configured. Please configure AWS credentials {str(e)}.")
except Exception as e:
raise ValueError(f"Failed to get AWS account ID: {str(e)}")

def get_content_type(file_name: str) -> str:
"""Get content type based on file extension with proper charset for text files"""
content_type, _ = mimetypes.guess_type(file_name)
if not content_type:
content_type = 'application/octet-stream'

# Add charset=utf-8 for text-based content types to ensure proper Unicode display
if content_type.startswith(('text/', 'application/json', 'application/xml', 'application/javascript')):
if 'charset' not in content_type:
content_type += '; charset=utf-8'
elif content_type == 'text/html' or file_name.lower().endswith('.html'):
content_type = 'text/html; charset=utf-8'

return content_type

def create_bucket_if_not_exists(s3_client, bucket_name: str, region: str) -> bool:
"""Create S3 bucket if it doesn't exist"""
try:
# Check if bucket exists
s3_client.head_bucket(Bucket=bucket_name)
logger.info(f"Bucket {bucket_name} already exists")
return True
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == '404':
# Bucket doesn't exist, create it
try:
if region == 'us-east-1':
# For us-east-1, don't specify LocationConstraint
s3_client.create_bucket(Bucket=bucket_name)
else:
s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={'LocationConstraint': region}
)
logger.info(f"Created bucket {bucket_name}")
return True
except ClientError as create_error:
raise ValueError(f"Failed to create bucket {bucket_name}: {str(create_error)}")
else:
raise ValueError(f"Failed to check bucket {bucket_name}: {str(e)}")

def upload_file_to_s3(s3_client, bucket_name: str, file_name: str, file_content: str, folder: str = 'files') -> str:
"""Upload file to S3 and return a presigned URL with 1-hour expiration"""
try:
# Prepare the S3 key (file path)
s3_key = f"{folder}/{file_name}"

# Get content type
content_type = get_content_type(file_name)

# Encode content as UTF-8 bytes
file_bytes = file_content.encode('utf-8')

# Upload file with proper content type
s3_client.put_object(
Bucket=bucket_name,
Key=s3_key,
Body=file_bytes,
ContentType=content_type
)

# Generate presigned URL with 1-hour expiration
presigned_url = s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': bucket_name, 'Key': s3_key},
ExpiresIn=3600*expire_hours # 7 days in seconds (maximum allowed)
)

logger.info(f"Successfully uploaded {file_name} with content type {content_type} and generated presigned URL")
return presigned_url

except ClientError as e:
raise ValueError(f"Failed to upload file to S3: {str(e)}")

@mcp.tool()
def upload_file(file_name: str, file_content: str) -> str:
"""Upload a file to S3 bucket and return a presigned URL with expiration

Args:
file_name: Name of the file to upload (including extension)
file_content: Content of the file as a string

Returns:
presigned S3 URL of the uploaded file
"""
try:
# Get AWS credentials
credentials = get_aws_credentials()

# Get AWS region
region = get_aws_region()

# Get current AWS account ID
account_id = get_account_id(region)

# Create bucket name
bucket_name = f"agentcore-demo-{account_id}"

# Initialize S3 client with region and explicit credentials
if credentials:
s3_client = boto3.client('s3', region_name=region, **credentials)
else:
s3_client = boto3.client('s3', region_name=region)

# Create bucket if it doesn't exist
create_bucket_if_not_exists(s3_client, bucket_name, region)

# Upload file to S3
s3_url = upload_file_to_s3(s3_client, bucket_name, file_name, file_content)

return s3_url

except Exception as e:
logger.error(f"Error uploading file: {str(e)}")
return json.dumps({
"success": False,
"error": str(e)
}, ensure_ascii=False)

if __name__ == "__main__":
mcp.run(transport="streamable-http")
Loading