From 053e43184b5d36c4c16ac0de79fc7af875ebf459 Mon Sep 17 00:00:00 2001 From: Inderpreet Singh Date: Mon, 7 Jul 2025 17:12:08 -0700 Subject: [PATCH 1/6] Adding migration mcp server under list of mcp servers Adding migration mcp server under list of mcp servers --- .../documentdb-migration-mcp/.gitignore | 19 + mcp-server/documentdb-migration-mcp/README.md | 216 ++++++ .../awslabs/__init__.py | 4 + .../__init__.py | 2 + .../boundary_tools.py | 219 ++++++ .../cdc_tools.py | 275 ++++++++ .../full_load_tools.py | 342 +++++++++ .../index_tools.py | 389 +++++++++++ .../scripts/cdc-multiprocess.py | 660 ++++++++++++++++++ .../scripts/documentdb_index_tool.py | 643 +++++++++++++++++ .../scripts/fl-multiprocess-filtered.py | 273 ++++++++ .../scripts/fl-multiprocess.py | 326 +++++++++ .../documentdb_migration_mcp_server/server.py | 156 +++++ 13 files changed, 3524 insertions(+) create mode 100644 mcp-server/documentdb-migration-mcp/.gitignore create mode 100644 mcp-server/documentdb-migration-mcp/README.md create mode 100644 mcp-server/documentdb-migration-mcp/awslabs/__init__.py create mode 100644 mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/__init__.py create mode 100644 mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/boundary_tools.py create mode 100644 mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/cdc_tools.py create mode 100644 mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/full_load_tools.py create mode 100644 mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/index_tools.py create mode 100644 mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/cdc-multiprocess.py create mode 100644 mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/documentdb_index_tool.py create mode 100644 mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/fl-multiprocess-filtered.py create mode 100644 mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/fl-multiprocess.py create mode 100644 mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/server.py diff --git a/mcp-server/documentdb-migration-mcp/.gitignore b/mcp-server/documentdb-migration-mcp/.gitignore new file mode 100644 index 0000000..f397032 --- /dev/null +++ b/mcp-server/documentdb-migration-mcp/.gitignore @@ -0,0 +1,19 @@ +# Python cache files +*.py[cod] + +# Build artifacts +dist/ +build/ +*.egg-info/ + +# UV package files +uv-package/ +.uv/ + +# IDE files +.vscode/ +.idea/ + +# OS specific files +.DS_Store +Thumbs.db diff --git a/mcp-server/documentdb-migration-mcp/README.md b/mcp-server/documentdb-migration-mcp/README.md new file mode 100644 index 0000000..35b4126 --- /dev/null +++ b/mcp-server/documentdb-migration-mcp/README.md @@ -0,0 +1,216 @@ +# DocumentDB Migration MCP Server + +This MCP (Model Context Protocol) server provides tools for migrating data to DocumentDB. It wraps the existing DocumentDB migration tools into an MCP server interface, making them accessible through the MCP protocol. + +## Features + +- **Full Load Migration**: Migrate data from a source database to DocumentDB in a one-time operation +- **Filtered Full Load Migration**: Migrate data with filtering based on TTL +- **Change Data Capture (CDC)**: Continuously replicate changes from a source database to DocumentDB +- **Resume Token Management**: Get change stream resume tokens for CDC operations +- **Automatic Boundary Generation**: Automatically generate optimal boundaries for segmenting collections during migration +- **Index Management**: Export, restore, and check compatibility of indexes between MongoDB and DocumentDB + +## Installation + +### 1. Through your favorite AI Agentic tool (e.g., for Amazon Q Developer CLI MCP, Claude, etc.) using uv package (Recommended) + +```json +{ + "documentdb-migration-mcp-server": { + "autoApprove": [], + "disabled": false, + "timeout": 60, + "command": "uvx", + "args": [ + "documentdb-migration-mcp-server@latest" + ], + "env": { + "FASTMCP_LOG_LEVEL": "INFO", + "AWS_PROFILE": "default", + "AWS_REGION": "us-east-1" + }, + "transportType": "stdio" + } +} +``` +You can customize the AWS profile and region by changing the `AWS_PROFILE` and `AWS_REGION` environment variables. + +### 2. Through your favorite AI Agentic tool using local file + +First, download the source code: + +```bash +# Clone the repository +git clone https://github.com/awslabs/documentdb-migration-mcp-server.git + +# Install dependencies +pip install pymongo boto3 fastmcp +``` + +Then configure your AI Agentic tool with: + +```json +{ + "documentdb-migration-mcp-server": { + "autoApprove": [], + "disabled": false, + "timeout": 60, + "command": "python3", + "args": [ + "-m", + "awslabs.documentdb_migration_mcp_server.server" + ], + "env": { + "FASTMCP_LOG_LEVEL": "INFO", + "AWS_PROFILE": "default", + "AWS_REGION": "us-east-1", + "PYTHONPATH": "/path/to/documentdb-migration-mcp-server" + }, + "transportType": "stdio" + } +} +``` + +> **Note:** Replace `/path/to/documentdb-migration-mcp-server` with the actual path to your local repository. + +### 3. Using bash + +```bash +# Install using uv package +uvx documentdb-migration-mcp-server@latest + +# Or run from source +git clone https://github.com/awslabs/documentdb-migration-mcp-server.git +cd documentdb-migration-mcp-server +pip install pymongo boto3 mcp-server +python -m awslabs.documentdb_migration_mcp_server.server +``` + +## MCP Tools + +### runFullLoad + +Run a full load migration from source to target. + +**Parameters:** +- `source_uri`: Source URI in MongoDB Connection String format +- `target_uri`: Target URI in MongoDB Connection String format +- `source_namespace`: Source Namespace as . +- `target_namespace`: (Optional) Target Namespace as ., defaults to source_namespace +- `boundaries`: (Optional) Comma-separated list of boundaries for segmenting. If not provided, boundaries will be auto-generated. +- `boundary_datatype`: (Optional) Datatype of boundaries (objectid, string, int). Auto-detected if boundaries are auto-generated. +- `max_inserts_per_batch`: Maximum number of inserts to include in a single batch +- `feedback_seconds`: Number of seconds between feedback output +- `dry_run`: Read source changes only, do not apply to target +- `verbose`: Enable verbose logging +- `create_cloudwatch_metrics`: Create CloudWatch metrics for monitoring +- `cluster_name`: Name of cluster for CloudWatch metrics + +### runFilteredFullLoad + +Run a filtered full load migration from source to target. + +**Parameters:** +- `source_uri`: Source URI in MongoDB Connection String format +- `target_uri`: Target URI in MongoDB Connection String format +- `source_namespace`: Source Namespace as . +- `target_namespace`: (Optional) Target Namespace as ., defaults to source_namespace +- `boundaries`: (Optional) Comma-separated list of boundaries for segmenting. If not provided, boundaries will be auto-generated. +- `boundary_datatype`: (Optional) Datatype of boundaries (objectid, string, int). Auto-detected if boundaries are auto-generated. +- `max_inserts_per_batch`: Maximum number of inserts to include in a single batch +- `feedback_seconds`: Number of seconds between feedback output +- `dry_run`: Read source changes only, do not apply to target +- `verbose`: Enable verbose logging + +### runCDC + +Run a CDC (Change Data Capture) migration from source to target. + +**Parameters:** +- `source_uri`: Source URI in MongoDB Connection String format +- `target_uri`: Target URI in MongoDB Connection String format +- `source_namespace`: Source Namespace as . +- `target_namespace`: (Optional) Target Namespace as ., defaults to source_namespace +- `start_position`: Starting position - 0 for all available changes, YYYY-MM-DD+HH:MM:SS in UTC, or change stream resume token +- `use_oplog`: Use the oplog as change data capture source (MongoDB only) +- `use_change_stream`: Use change streams as change data capture source (MongoDB or DocumentDB) +- `threads`: Number of threads (parallel processing) +- `duration_seconds`: Number of seconds to run before exiting, 0 = run forever +- `max_operations_per_batch`: Maximum number of operations to include in a single batch +- `max_seconds_between_batches`: Maximum number of seconds to await full batch +- `feedback_seconds`: Number of seconds between feedback output +- `dry_run`: Read source changes only, do not apply to target +- `verbose`: Enable verbose logging +- `create_cloudwatch_metrics`: Create CloudWatch metrics for monitoring +- `cluster_name`: Name of cluster for CloudWatch metrics + +### getResumeToken + +Get the current change stream resume token. + +**Parameters:** +- `source_uri`: Source URI in MongoDB Connection String format +- `source_namespace`: Source Namespace as . + +### generateBoundaries + +Generate boundaries for segmenting a collection during migration. + +**Parameters:** +- `uri`: MongoDB Connection String format URI +- `database`: Database name +- `collection`: Collection name +- `num_segments`: Number of segments to divide the collection into +- `use_single_cursor`: (Optional) Use a single cursor to scan the collection (slower but more reliable), defaults to false + +### dumpIndexes + +Dump indexes from a MongoDB or DocumentDB instance. + +**Parameters:** +- `uri`: URI to connect to MongoDB or Amazon DocumentDB +- `output_dir`: (Optional) Directory to export indexes to. If not provided, a temporary directory will be created. +- `dry_run`: (Optional) Perform processing, but do not actually export indexes +- `debug`: (Optional) Output debugging information + +### restoreIndexes + +Restore indexes to an Amazon DocumentDB instance. + +**Parameters:** +- `uri`: URI to connect to Amazon DocumentDB +- `index_dir`: Directory containing index metadata to restore from +- `skip_incompatible`: (Optional) Skip incompatible indexes when restoring metadata, defaults to true +- `support_2dsphere`: (Optional) Support 2dsphere indexes creation, defaults to false +- `dry_run`: (Optional) Perform processing, but do not actually restore indexes +- `debug`: (Optional) Output debugging information +- `shorten_index_name`: (Optional) Shorten long index name to compatible length, defaults to true +- `skip_id_indexes`: (Optional) Do not create _id indexes, defaults to true + +### showIndexCompatibilityIssues + +Show compatibility issues with Amazon DocumentDB. + +**Parameters:** +- `index_dir`: Directory containing index metadata to check +- `debug`: (Optional) Output debugging information + +### showCompatibleIndexes + +Show compatible indexes with Amazon DocumentDB. + +**Parameters:** +- `index_dir`: Directory containing index metadata to check +- `debug`: (Optional) Output debugging information + +## Requirements + +- Python 3.10+ +- PyMongo +- Boto3 (for CloudWatch metrics) +- MCP Server + +## License + +This project is licensed under the Apache License 2.0 - see the LICENSE file for details. \ No newline at end of file diff --git a/mcp-server/documentdb-migration-mcp/awslabs/__init__.py b/mcp-server/documentdb-migration-mcp/awslabs/__init__.py new file mode 100644 index 0000000..d77b263 --- /dev/null +++ b/mcp-server/documentdb-migration-mcp/awslabs/__init__.py @@ -0,0 +1,4 @@ + +"""AWS Labs namespace package.""" + +__path__ = __import__('pkgutil').extend_path(__path__, __name__) diff --git a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/__init__.py b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/__init__.py new file mode 100644 index 0000000..cfd66f6 --- /dev/null +++ b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/__init__.py @@ -0,0 +1,2 @@ + +"""AWS Labs DocumentDB Migration MCP Server package.""" diff --git a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/boundary_tools.py b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/boundary_tools.py new file mode 100644 index 0000000..c9dcba4 --- /dev/null +++ b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/boundary_tools.py @@ -0,0 +1,219 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance +# with the License. A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions +# and limitations under the License. + +"""Boundary generation tools for DocumentDB Migration MCP Server.""" + +import os +import sys +import time +import pymongo +import warnings +from datetime import datetime +from typing import Annotated, Any, Dict, List, Optional +from pydantic import Field +from loguru import logger + + +async def generate_boundaries( + uri: Annotated[ + str, + Field( + description='MongoDB Connection String format URI' + ), + ], + database: Annotated[ + str, + Field( + description='Database name' + ), + ], + collection: Annotated[ + str, + Field( + description='Collection name' + ), + ], + num_segments: Annotated[ + int, + Field( + description='Number of segments to divide the collection into' + ), + ], + use_single_cursor: Annotated[ + bool, + Field( + description='Use a single cursor to scan the collection (slower but more reliable)' + ), + ] = False, +) -> Dict[str, Any]: + """Generate boundaries for segmenting a collection during migration. + + This tool analyzes a collection and generates boundary values that can be used + to divide the collection into segments for parallel migration. It uses the + DMS Segment Analyzer approach to find optimal boundaries. + + Returns: + Dict[str, Any]: Generated boundaries and related information + """ + logger.info(f"Generating boundaries for {database}.{collection} with {num_segments} segments") + + # Suppress DocumentDB connection warnings + warnings.filterwarnings("ignore", "You appear to be connected to a DocumentDB cluster.") + + # Check for mixed or unsupported ID types + supported_id_types = ['int', 'string', 'objectId'] + + try: + client = pymongo.MongoClient(host=uri, appname='mcp-boundary-gen') + db = client[database] + col = db[collection] + + # Check collection stats + coll_stats = db.command("collStats", collection) + num_documents = coll_stats['count'] + + if num_documents == 0: + return { + "success": False, + "message": f"Collection {database}.{collection} is empty", + "boundaries": [], + "boundary_datatype": None + } + + # Check ID types + id_type_first = col.aggregate([ + {"$sort": {"_id": pymongo.ASCENDING}}, + {"$project": {"_id": False, "idType": {"$type": "$_id"}}}, + {"$limit": 1} + ]).next()['idType'] + + id_type_last = col.aggregate([ + {"$sort": {"_id": pymongo.DESCENDING}}, + {"$project": {"_id": False, "idType": {"$type": "$_id"}}}, + {"$limit": 1} + ]).next()['idType'] + + if id_type_first not in supported_id_types: + return { + "success": False, + "message": f"Unsupported data type '{id_type_first}' for _id field. Only {supported_id_types} are supported.", + "boundaries": [], + "boundary_datatype": None + } + + if id_type_last not in supported_id_types: + return { + "success": False, + "message": f"Unsupported data type '{id_type_last}' for _id field. Only {supported_id_types} are supported.", + "boundaries": [], + "boundary_datatype": None + } + + if id_type_first != id_type_last: + return { + "success": False, + "message": f"Mixed data types '{id_type_first}' and '{id_type_last}' for _id field.", + "boundaries": [], + "boundary_datatype": None + } + + # Map MongoDB type names to our boundary datatypes + boundary_datatype = "objectid" if id_type_first == "objectId" else id_type_first.lower() + + # Generate boundaries + boundary_list = [] + num_boundaries = num_segments - 1 + docs_per_segment = int(num_documents / num_segments) + + logger.info(f"Collection contains {num_documents} documents") + logger.info(f"Generating {num_boundaries} boundaries with ~{docs_per_segment} documents per segment") + + query_start_time = time.time() + + if use_single_cursor: + # Use cursor method (slower but more reliable) + cursor = col.find( + filter=None, + projection={"_id": True}, + sort=[("_id", pymongo.ASCENDING)] + ) + + docs_processed = 0 + docs_in_segment = 0 + boundary_count = 0 + + for doc in cursor: + docs_processed += 1 + docs_in_segment += 1 + + if docs_in_segment >= docs_per_segment: + docs_in_segment = 0 + boundary_count += 1 + boundary_list.append(doc["_id"]) + logger.info(f"Found boundary {boundary_count}: {doc['_id']}") + + if boundary_count >= num_boundaries: + break + else: + # Use skip method (faster but may timeout on large collections) + # Get the first _id + first_doc = col.find_one( + filter=None, + projection={"_id": True}, + sort=[("_id", pymongo.ASCENDING)] + ) + + current_id = first_doc["_id"] + + for i in range(num_boundaries): + current_doc = col.find_one( + filter={"_id": {"$gt": current_id}}, + projection={"_id": True}, + sort=[("_id", pymongo.ASCENDING)], + skip=docs_per_segment + ) + + if current_doc is None: + # We've reached the end of the collection + break + + current_id = current_doc["_id"] + boundary_list.append(current_id) + logger.info(f"Found boundary {i+1}: {current_id}") + + query_elapsed_secs = int(time.time() - query_start_time) + logger.info(f"Boundary generation completed in {query_elapsed_secs} seconds") + + # Convert boundaries to strings for consistent return format + boundary_strings = [str(b) for b in boundary_list] + boundaries_csv = ",".join(boundary_strings) + + client.close() + + return { + "success": True, + "message": f"Successfully generated {len(boundary_list)} boundaries", + "boundaries": boundary_strings, + "boundaries_csv": boundaries_csv, + "boundary_datatype": boundary_datatype, + "num_documents": num_documents, + "docs_per_segment": docs_per_segment, + "execution_time_seconds": query_elapsed_secs + } + + except Exception as e: + logger.error(f"Error generating boundaries: {str(e)}") + return { + "success": False, + "message": f"Failed to generate boundaries: {str(e)}", + "boundaries": [], + "boundary_datatype": None + } diff --git a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/cdc_tools.py b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/cdc_tools.py new file mode 100644 index 0000000..eb1848b --- /dev/null +++ b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/cdc_tools.py @@ -0,0 +1,275 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance +# with the License. A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions +# and limitations under the License. + +"""CDC migration tools for DocumentDB Migration MCP Server.""" + +import os +import sys +import time +import subprocess +from datetime import datetime +from typing import Annotated, Any, Dict, List, Optional +from pydantic import Field +from loguru import logger + + +async def run_cdc( + source_uri: Annotated[ + str, + Field( + description='Source URI in MongoDB Connection String format' + ), + ], + target_uri: Annotated[ + str, + Field( + description='Target URI in MongoDB Connection String format' + ), + ], + source_namespace: Annotated[ + str, + Field( + description='Source Namespace as .' + ), + ], + target_namespace: Annotated[ + Optional[str], + Field( + description='Target Namespace as ., defaults to source_namespace' + ), + ] = None, + start_position: Annotated[ + str, + Field( + description='Starting position - 0 for all available changes, YYYY-MM-DD+HH:MM:SS in UTC, or change stream resume token' + ), + ] = "0", + use_oplog: Annotated[ + bool, + Field( + description='Use the oplog as change data capture source (MongoDB only)' + ), + ] = False, + use_change_stream: Annotated[ + bool, + Field( + description='Use change streams as change data capture source (MongoDB or DocumentDB)' + ), + ] = False, + threads: Annotated[ + int, + Field( + description='Number of threads (parallel processing)' + ), + ] = 1, + duration_seconds: Annotated[ + int, + Field( + description='Number of seconds to run before exiting, 0 = run forever' + ), + ] = 0, + max_operations_per_batch: Annotated[ + int, + Field( + description='Maximum number of operations to include in a single batch' + ), + ] = 100, + max_seconds_between_batches: Annotated[ + int, + Field( + description='Maximum number of seconds to await full batch' + ), + ] = 5, + feedback_seconds: Annotated[ + int, + Field( + description='Number of seconds between feedback output' + ), + ] = 60, + dry_run: Annotated[ + bool, + Field( + description='Read source changes only, do not apply to target' + ), + ] = False, + verbose: Annotated[ + bool, + Field( + description='Enable verbose logging' + ), + ] = False, + create_cloudwatch_metrics: Annotated[ + bool, + Field( + description='Create CloudWatch metrics for monitoring' + ), + ] = False, + cluster_name: Annotated[ + Optional[str], + Field( + description='Name of cluster for CloudWatch metrics' + ), + ] = None, +) -> Dict[str, Any]: + """Run a CDC (Change Data Capture) migration from source to target. + + This tool executes a CDC migration from a source DocumentDB/MongoDB database + to a target DocumentDB database. It uses the cdc-multiprocess.py script to perform + the migration with the specified parameters. + + Returns: + Dict[str, Any]: Status of the migration operation + """ + logger.info(f"Starting CDC migration from {source_namespace} to {target_namespace or source_namespace}") + + # Validate parameters + if create_cloudwatch_metrics and not cluster_name: + raise ValueError("Must supply cluster_name when capturing CloudWatch metrics") + + if not use_oplog and not use_change_stream: + raise ValueError("Must supply either use_oplog=True or use_change_stream=True") + + if use_oplog and use_change_stream: + raise ValueError("Cannot supply both use_oplog=True and use_change_stream=True") + + if use_change_stream and start_position == "0": + raise ValueError("start_position must be supplied as YYYY-MM-DD+HH:MM:SS in UTC or resume token when using change streams") + + # Build command + script_path = os.path.join(os.path.dirname(__file__), "scripts", "cdc-multiprocess.py") + + cmd = [ + "python3", + script_path, + "--source-uri", source_uri, + "--target-uri", target_uri, + "--source-namespace", source_namespace, + "--start-position", start_position, + "--threads", str(threads), + "--duration-seconds", str(duration_seconds), + "--max-operations-per-batch", str(max_operations_per_batch), + "--max-seconds-between-batches", str(max_seconds_between_batches), + "--feedback-seconds", str(feedback_seconds), + ] + + if target_namespace: + cmd.extend(["--target-namespace", target_namespace]) + + if use_oplog: + cmd.append("--use-oplog") + + if use_change_stream: + cmd.append("--use-change-stream") + + if dry_run: + cmd.append("--dry-run") + + if verbose: + cmd.append("--verbose") + + if create_cloudwatch_metrics: + cmd.append("--create-cloudwatch-metrics") + cmd.extend(["--cluster-name", cluster_name]) + + # Execute command + try: + logger.info(f"Executing command: {' '.join(cmd)}") + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + universal_newlines=True + ) + + # Process is running in the background + return { + "success": True, + "message": "CDC migration started successfully", + "process_id": process.pid, + "command": " ".join(cmd), + } + except Exception as e: + logger.error(f"Error starting CDC migration: {str(e)}") + raise ValueError(f"Failed to start CDC migration: {str(e)}") + + +async def get_resume_token( + source_uri: Annotated[ + str, + Field( + description='Source URI in MongoDB Connection String format' + ), + ], + source_namespace: Annotated[ + str, + Field( + description='Source Namespace as .' + ), + ], +) -> Dict[str, Any]: + """Get the current change stream resume token. + + This tool retrieves the current change stream resume token from the source database. + The resume token can be used as the start_position for a CDC migration. + + Returns: + Dict[str, Any]: The resume token + """ + logger.info(f"Getting resume token for {source_namespace}") + + # Build command + script_path = os.path.join(os.path.dirname(__file__), "scripts", "cdc-multiprocess.py") + + cmd = [ + "python3", + script_path, + "--source-uri", source_uri, + "--target-uri", "mongodb://localhost:27017", # Dummy target URI, not used + "--source-namespace", source_namespace, + "--start-position", "NOW", + "--get-resume-token", + ] + + # Execute command + try: + logger.info(f"Executing command: {' '.join(cmd)}") + result = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True + ) + + # Parse output to extract resume token + output = result.stdout + resume_token = None + + for line in output.splitlines(): + if "Change stream resume token is" in line: + resume_token = line.split("Change stream resume token is")[1].strip() + break + + if resume_token: + return { + "success": True, + "resume_token": resume_token, + } + else: + raise ValueError("Failed to extract resume token from output") + except subprocess.CalledProcessError as e: + logger.error(f"Error getting resume token: {e.stderr}") + raise ValueError(f"Failed to get resume token: {e.stderr}") + except Exception as e: + logger.error(f"Error getting resume token: {str(e)}") + raise ValueError(f"Failed to get resume token: {str(e)}") diff --git a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/full_load_tools.py b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/full_load_tools.py new file mode 100644 index 0000000..066b001 --- /dev/null +++ b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/full_load_tools.py @@ -0,0 +1,342 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance +# with the License. A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions +# and limitations under the License. + +"""Full Load migration tools for DocumentDB Migration MCP Server.""" + +import os +import sys +import time +import subprocess +from datetime import datetime +from typing import Annotated, Any, Dict, List, Optional +from pydantic import Field +from loguru import logger +from awslabs.documentdb_migration_mcp_server.boundary_tools import generate_boundaries + + +async def run_full_load( + source_uri: Annotated[ + str, + Field( + description='Source URI in MongoDB Connection String format' + ), + ], + target_uri: Annotated[ + str, + Field( + description='Target URI in MongoDB Connection String format' + ), + ], + source_namespace: Annotated[ + str, + Field( + description='Source Namespace as .' + ), + ], + target_namespace: Annotated[ + Optional[str], + Field( + description='Target Namespace as ., defaults to source_namespace' + ), + ] = None, + boundaries: Annotated[ + str, + Field( + description='Comma-separated list of boundaries for segmenting' + ), + ] = None, + boundary_datatype: Annotated[ + str, + Field( + description='Datatype of boundaries (objectid, string, int)' + ), + ] = 'objectid', + max_inserts_per_batch: Annotated[ + int, + Field( + description='Maximum number of inserts to include in a single batch' + ), + ] = 100, + feedback_seconds: Annotated[ + int, + Field( + description='Number of seconds between feedback output' + ), + ] = 60, + dry_run: Annotated[ + bool, + Field( + description='Read source changes only, do not apply to target' + ), + ] = False, + verbose: Annotated[ + bool, + Field( + description='Enable verbose logging' + ), + ] = False, + create_cloudwatch_metrics: Annotated[ + bool, + Field( + description='Create CloudWatch metrics for monitoring' + ), + ] = False, + cluster_name: Annotated[ + Optional[str], + Field( + description='Name of cluster for CloudWatch metrics' + ), + ] = None, +) -> Dict[str, Any]: + """Run a full load migration from source to target. + + This tool executes a full load migration from a source DocumentDB/MongoDB database + to a target DocumentDB database. It uses the fl-multiprocess.py script to perform + the migration with the specified parameters. + + Returns: + Dict[str, Any]: Status of the migration operation + """ + logger.info(f"Starting full load migration from {source_namespace} to {target_namespace or source_namespace}") + + # Validate parameters + if create_cloudwatch_metrics and not cluster_name: + raise ValueError("Must supply cluster_name when capturing CloudWatch metrics") + + # Auto-generate boundaries if not provided + if not boundaries: + logger.info("No boundaries provided, auto-generating boundaries") + + # Parse source namespace to get database and collection + db_name, coll_name = source_namespace.split('.', 1) + + # Default to 4 segments if not specified + num_segments = 4 + + # Generate boundaries + boundary_result = await generate_boundaries( + uri=source_uri, + database=db_name, + collection=coll_name, + num_segments=num_segments, + use_single_cursor=False + ) + + if not boundary_result["success"]: + raise ValueError(f"Failed to auto-generate boundaries: {boundary_result['message']}") + + boundaries = boundary_result["boundaries_csv"] + boundary_datatype = boundary_result["boundary_datatype"] + + logger.info(f"Auto-generated boundaries: {boundaries}") + logger.info(f"Boundary datatype: {boundary_datatype}") + + # Build command + script_path = os.path.join(os.path.dirname(__file__), "scripts", "fl-multiprocess.py") + + cmd = [ + "python3", + script_path, + "--source-uri", source_uri, + "--target-uri", target_uri, + "--source-namespace", source_namespace, + "--boundaries", boundaries, + "--boundary-datatype", boundary_datatype, + "--max-inserts-per-batch", str(max_inserts_per_batch), + "--feedback-seconds", str(feedback_seconds), + ] + + if target_namespace: + cmd.extend(["--target-namespace", target_namespace]) + + if dry_run: + cmd.append("--dry-run") + + if verbose: + cmd.append("--verbose") + + if create_cloudwatch_metrics: + cmd.append("--create-cloudwatch-metrics") + cmd.extend(["--cluster-name", cluster_name]) + + # Execute command + try: + logger.info(f"Executing command: {' '.join(cmd)}") + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + universal_newlines=True + ) + + # Process is running in the background + return { + "success": True, + "message": "Full load migration started successfully", + "process_id": process.pid, + "command": " ".join(cmd), + } + except Exception as e: + logger.error(f"Error starting full load migration: {str(e)}") + raise ValueError(f"Failed to start full load migration: {str(e)}") + + +async def run_filtered_full_load( + source_uri: Annotated[ + str, + Field( + description='Source URI in MongoDB Connection String format' + ), + ], + target_uri: Annotated[ + str, + Field( + description='Target URI in MongoDB Connection String format' + ), + ], + source_namespace: Annotated[ + str, + Field( + description='Source Namespace as .' + ), + ], + target_namespace: Annotated[ + Optional[str], + Field( + description='Target Namespace as ., defaults to source_namespace' + ), + ] = None, + boundaries: Annotated[ + str, + Field( + description='Comma-separated list of boundaries for segmenting' + ), + ] = None, + boundary_datatype: Annotated[ + str, + Field( + description='Datatype of boundaries (objectid, string, int)' + ), + ] = 'objectid', + max_inserts_per_batch: Annotated[ + int, + Field( + description='Maximum number of inserts to include in a single batch' + ), + ] = 100, + feedback_seconds: Annotated[ + int, + Field( + description='Number of seconds between feedback output' + ), + ] = 60, + dry_run: Annotated[ + bool, + Field( + description='Read source changes only, do not apply to target' + ), + ] = False, + verbose: Annotated[ + bool, + Field( + description='Enable verbose logging' + ), + ] = False, +) -> Dict[str, Any]: + """Run a filtered full load migration from source to target. + + This tool executes a filtered full load migration from a source DocumentDB/MongoDB database + to a target DocumentDB database. It uses the fl-multiprocess-filtered.py script to perform + the migration with the specified parameters. This version filters documents based on TTL. + + Returns: + Dict[str, Any]: Status of the migration operation + """ + logger.info(f"Starting filtered full load migration from {source_namespace} to {target_namespace or source_namespace}") + + # Validate parameters + + # Auto-generate boundaries if not provided + if not boundaries: + logger.info("No boundaries provided, auto-generating boundaries") + + # Parse source namespace to get database and collection + db_name, coll_name = source_namespace.split('.', 1) + + # Default to 4 segments if not specified + num_segments = 4 + + # Generate boundaries + boundary_result = await generate_boundaries( + uri=source_uri, + database=db_name, + collection=coll_name, + num_segments=num_segments, + use_single_cursor=False + ) + + if not boundary_result["success"]: + raise ValueError(f"Failed to auto-generate boundaries: {boundary_result['message']}") + + boundaries = boundary_result["boundaries_csv"] + boundary_datatype = boundary_result["boundary_datatype"] + + logger.info(f"Auto-generated boundaries: {boundaries}") + logger.info(f"Boundary datatype: {boundary_datatype}") + + # Build command + script_path = os.path.join(os.path.dirname(__file__), "scripts", "fl-multiprocess-filtered.py") + + cmd = [ + "python3", + script_path, + "--source-uri", source_uri, + "--target-uri", target_uri, + "--source-namespace", source_namespace, + "--boundaries", boundaries, + "--boundary-datatype", boundary_datatype, + "--max-inserts-per-batch", str(max_inserts_per_batch), + "--feedback-seconds", str(feedback_seconds), + ] + + if target_namespace: + cmd.extend(["--target-namespace", target_namespace]) + + if dry_run: + cmd.append("--dry-run") + + if verbose: + cmd.append("--verbose") + + # Execute command + try: + logger.info(f"Executing command: {' '.join(cmd)}") + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + universal_newlines=True + ) + + # Process is running in the background + return { + "success": True, + "message": "Filtered full load migration started successfully", + "process_id": process.pid, + "command": " ".join(cmd), + } + except Exception as e: + logger.error(f"Error starting filtered full load migration: {str(e)}") + raise ValueError(f"Failed to start filtered full load migration: {str(e)}") diff --git a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/index_tools.py b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/index_tools.py new file mode 100644 index 0000000..2616830 --- /dev/null +++ b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/index_tools.py @@ -0,0 +1,389 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance +# with the License. A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions +# and limitations under the License. + +"""Index tools for DocumentDB Migration MCP Server.""" + +import os +import sys +import time +import subprocess +import json +import tempfile +from datetime import datetime +from typing import Annotated, Any, Dict, List, Optional +from pydantic import Field +from loguru import logger + + +async def dump_indexes( + uri: Annotated[ + str, + Field( + description='URI to connect to MongoDB or Amazon DocumentDB' + ), + ], + output_dir: Annotated[ + str, + Field( + description='Directory to export indexes to' + ), + ] = None, + dry_run: Annotated[ + bool, + Field( + description='Perform processing, but do not actually export indexes' + ), + ] = False, + debug: Annotated[ + bool, + Field( + description='Output debugging information' + ), + ] = False, +) -> Dict[str, Any]: + """Dump indexes from a MongoDB or DocumentDB instance. + + This tool exports indexes metadata from a running MongoDB or Amazon DocumentDB deployment. + + Returns: + Dict[str, Any]: Status of the index dump operation + """ + logger.info(f"Starting index dump from {uri}") + + # Create a temporary directory if output_dir is not provided + if not output_dir: + output_dir = tempfile.mkdtemp(prefix="index_dump_") + logger.info(f"Created temporary directory for index dump: {output_dir}") + + # Build command + script_path = os.path.join(os.path.dirname(__file__), "scripts", "documentdb_index_tool.py") + + cmd = [ + "python3", + script_path, + "--dump-indexes", + "--dir", output_dir, + "--uri", uri, + ] + + if dry_run: + cmd.append("--dry-run") + + if debug: + cmd.append("--debug") + + # Execute command + try: + logger.info(f"Executing command: {' '.join(cmd)}") + result = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True + ) + + return { + "success": True, + "message": "Index dump completed successfully", + "output_dir": output_dir, + "stdout": result.stdout, + "stderr": result.stderr, + } + except subprocess.CalledProcessError as e: + logger.error(f"Error dumping indexes: {e.stderr}") + return { + "success": False, + "message": f"Failed to dump indexes: {e.stderr}", + "output_dir": output_dir, + "stdout": e.stdout, + "stderr": e.stderr, + } + except Exception as e: + logger.error(f"Error dumping indexes: {str(e)}") + return { + "success": False, + "message": f"Failed to dump indexes: {str(e)}", + "output_dir": output_dir, + } + + +async def restore_indexes( + uri: Annotated[ + str, + Field( + description='URI to connect to Amazon DocumentDB' + ), + ], + index_dir: Annotated[ + str, + Field( + description='Directory containing index metadata to restore from' + ), + ], + skip_incompatible: Annotated[ + bool, + Field( + description='Skip incompatible indexes when restoring metadata' + ), + ] = True, + support_2dsphere: Annotated[ + bool, + Field( + description='Support 2dsphere indexes creation (collections must use GeoJSON Point type for indexing)' + ), + ] = False, + dry_run: Annotated[ + bool, + Field( + description='Perform processing, but do not actually restore indexes' + ), + ] = False, + debug: Annotated[ + bool, + Field( + description='Output debugging information' + ), + ] = False, + shorten_index_name: Annotated[ + bool, + Field( + description='Shorten long index name to compatible length' + ), + ] = True, + skip_id_indexes: Annotated[ + bool, + Field( + description='Do not create _id indexes' + ), + ] = True, +) -> Dict[str, Any]: + """Restore indexes to an Amazon DocumentDB instance. + + This tool restores indexes from metadata to an Amazon DocumentDB instance. + + Returns: + Dict[str, Any]: Status of the index restore operation + """ + logger.info(f"Starting index restore to {uri}") + + # Build command + script_path = os.path.join(os.path.dirname(__file__), "scripts", "documentdb_index_tool.py") + + cmd = [ + "python3", + script_path, + "--restore-indexes", + "--dir", index_dir, + "--uri", uri, + ] + + if skip_incompatible: + cmd.append("--skip-incompatible") + + if support_2dsphere: + cmd.append("--support-2dsphere") + + if dry_run: + cmd.append("--dry-run") + + if debug: + cmd.append("--debug") + + if shorten_index_name: + cmd.append("--shorten-index-name") + + if skip_id_indexes: + cmd.append("--skip-id-indexes") + + # Execute command + try: + logger.info(f"Executing command: {' '.join(cmd)}") + result = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True + ) + + return { + "success": True, + "message": "Index restore completed successfully", + "stdout": result.stdout, + "stderr": result.stderr, + } + except subprocess.CalledProcessError as e: + logger.error(f"Error restoring indexes: {e.stderr}") + return { + "success": False, + "message": f"Failed to restore indexes: {e.stderr}", + "stdout": e.stdout, + "stderr": e.stderr, + } + except Exception as e: + logger.error(f"Error restoring indexes: {str(e)}") + return { + "success": False, + "message": f"Failed to restore indexes: {str(e)}", + } + + +async def show_compatibility_issues( + index_dir: Annotated[ + str, + Field( + description='Directory containing index metadata to check' + ), + ], + debug: Annotated[ + bool, + Field( + description='Output debugging information' + ), + ] = False, +) -> Dict[str, Any]: + """Show compatibility issues with Amazon DocumentDB. + + This tool checks index metadata for compatibility issues with Amazon DocumentDB. + + Returns: + Dict[str, Any]: Compatibility issues found + """ + logger.info(f"Checking compatibility issues in {index_dir}") + + # Build command + script_path = os.path.join(os.path.dirname(__file__), "scripts", "documentdb_index_tool.py") + + cmd = [ + "python3", + script_path, + "--show-issues", + "--dir", index_dir, + ] + + if debug: + cmd.append("--debug") + + # Execute command + try: + logger.info(f"Executing command: {' '.join(cmd)}") + result = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True + ) + + # Try to parse the JSON output + try: + issues = json.loads(result.stdout) + except json.JSONDecodeError: + issues = {"raw_output": result.stdout} + + return { + "success": True, + "message": "Compatibility check completed successfully", + "issues": issues, + "stdout": result.stdout, + "stderr": result.stderr, + } + except subprocess.CalledProcessError as e: + logger.error(f"Error checking compatibility: {e.stderr}") + return { + "success": False, + "message": f"Failed to check compatibility: {e.stderr}", + "stdout": e.stdout, + "stderr": e.stderr, + } + except Exception as e: + logger.error(f"Error checking compatibility: {str(e)}") + return { + "success": False, + "message": f"Failed to check compatibility: {str(e)}", + } + + +async def show_compatible_indexes( + index_dir: Annotated[ + str, + Field( + description='Directory containing index metadata to check' + ), + ], + debug: Annotated[ + bool, + Field( + description='Output debugging information' + ), + ] = False, +) -> Dict[str, Any]: + """Show compatible indexes with Amazon DocumentDB. + + This tool shows all indexes that are compatible with Amazon DocumentDB. + + Returns: + Dict[str, Any]: Compatible indexes + """ + logger.info(f"Checking compatible indexes in {index_dir}") + + # Build command + script_path = os.path.join(os.path.dirname(__file__), "scripts", "documentdb_index_tool.py") + + cmd = [ + "python3", + script_path, + "--show-compatible", + "--dir", index_dir, + ] + + if debug: + cmd.append("--debug") + + # Execute command + try: + logger.info(f"Executing command: {' '.join(cmd)}") + result = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True + ) + + # Try to parse the JSON output + try: + compatible_indexes = json.loads(result.stdout) + except json.JSONDecodeError: + compatible_indexes = {"raw_output": result.stdout} + + return { + "success": True, + "message": "Compatible indexes check completed successfully", + "compatible_indexes": compatible_indexes, + "stdout": result.stdout, + "stderr": result.stderr, + } + except subprocess.CalledProcessError as e: + logger.error(f"Error checking compatible indexes: {e.stderr}") + return { + "success": False, + "message": f"Failed to check compatible indexes: {e.stderr}", + "stdout": e.stdout, + "stderr": e.stderr, + } + except Exception as e: + logger.error(f"Error checking compatible indexes: {str(e)}") + return { + "success": False, + "message": f"Failed to check compatible indexes: {str(e)}", + } diff --git a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/cdc-multiprocess.py b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/cdc-multiprocess.py new file mode 100644 index 0000000..1ae8208 --- /dev/null +++ b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/cdc-multiprocess.py @@ -0,0 +1,660 @@ +from datetime import datetime, timedelta +import os +import sys +import time +import pymongo +from bson.timestamp import Timestamp +import threading +import multiprocessing as mp +import hashlib +import argparse +import boto3 +import warnings + + +def logIt(threadnum, message): + logTimeStamp = datetime.utcnow().isoformat()[:-3] + 'Z' + print("[{}] thread {:>3d} | {}".format(logTimeStamp,threadnum,message)) + + +def oplog_processor(threadnum, appConfig, perfQ): + warnings.filterwarnings("ignore","You appear to be connected to a DocumentDB cluster.") + + if appConfig['verboseLogging']: + logIt(threadnum,'thread started') + + c = pymongo.MongoClient(host=appConfig["sourceUri"],appname='migrcdc') + oplog = c.local.oplog.rs + + destConnection = pymongo.MongoClient(host=appConfig["targetUri"],appname='migrcdc') + destDatabase = destConnection[appConfig["targetNs"].split('.',1)[0]] + destCollection = destDatabase[appConfig["targetNs"].split('.',1)[1]] + + ''' + i = insert + u = update + d = delete + c = command + db = database + n = no-op + ''' + + startTime = time.time() + lastFeedback = time.time() + lastBatch = time.time() + + allDone = False + threadOplogEntries = 0 + + bulkOpList = [] + + # list with replace, not insert, in case document already exists (replaying old oplog) + bulkOpListReplace = [] + numCurrentBulkOps = 0 + + numTotalBatches = 0 + + printedFirstTs = False + myCollectionOps = 0 + + # starting timestamp + endTs = appConfig["startTs"] + + while not allDone: + if appConfig['verboseLogging']: + logIt(threadnum,"Creating oplog tailing cursor for timestamp {}".format(endTs.as_datetime())) + + cursor = oplog.find({'ts': {'$gte': endTs},'ns':appConfig["sourceNs"]},cursor_type=pymongo.CursorType.TAILABLE_AWAIT,oplog_replay=True) + + while cursor.alive and not allDone: + for doc in cursor: + # check if time to exit + if ((time.time() - startTime) > appConfig['durationSeconds']) and (appConfig['durationSeconds'] != 0): + allDone = True + break + + endTs = doc['ts'] + + # NOTE: Python's non-deterministic hash() cannot be used as it is seeded at startup, since this code is multiprocessing we need all hash calls to be the same between processes + # hash(str(doc['o']['_id'])) + if (((doc['op'] in ['i','d']) and (doc['ns'] == appConfig["sourceNs"]) and ((int(hashlib.sha512(str(doc['o']['_id']).encode('utf-8')).hexdigest(), 16) % appConfig["numProcessingThreads"]) == threadnum)) or + ((doc['op'] in ['u']) and (doc['ns'] == appConfig["sourceNs"]) and ((int(hashlib.sha512(str(doc['o2']['_id']).encode('utf-8')).hexdigest(), 16) % appConfig["numProcessingThreads"]) == threadnum))): + # this is for my thread + + threadOplogEntries += 1 + + if (not printedFirstTs) and (doc['op'] in ['i','u','d']) and (doc['ns'] == appConfig["sourceNs"]): + if appConfig['verboseLogging']: + logIt(threadnum,'first timestamp = {} aka {}'.format(doc['ts'],doc['ts'].as_datetime())) + printedFirstTs = True + + if (doc['op'] == 'i'): + # insert + if (doc['ns'] == appConfig["sourceNs"]): + myCollectionOps += 1 + bulkOpList.append(pymongo.InsertOne(doc['o'])) + # if playing old oplog, need to change inserts to be replaces (the inserts will fail due to _id uniqueness) + bulkOpListReplace.append(pymongo.ReplaceOne({'_id':doc['o']['_id']},doc['o'],upsert=True)) + numCurrentBulkOps += 1 + else: + pass + + elif (doc['op'] == 'u'): + # update + if (doc['ns'] == appConfig["sourceNs"]): + myCollectionOps += 1 + # field "$v" is not present in MongoDB 3.4 + doc['o'].pop('$v',None) + bulkOpList.append(pymongo.UpdateOne(doc['o2'],doc['o'],upsert=False)) + # if playing old oplog, need to change inserts to be replaces (the inserts will fail due to _id uniqueness) + bulkOpListReplace.append(pymongo.UpdateOne(doc['o2'],doc['o'],upsert=False)) + numCurrentBulkOps += 1 + else: + pass + + elif (doc['op'] == 'd'): + # delete + if (doc['ns'] == appConfig["sourceNs"]): + myCollectionOps += 1 + bulkOpList.append(pymongo.DeleteOne(doc['o'])) + # if playing old oplog, need to change inserts to be replaces (the inserts will fail due to _id uniqueness) + bulkOpListReplace.append(pymongo.DeleteOne(doc['o'])) + numCurrentBulkOps += 1 + else: + pass + + elif (doc['op'] == 'c'): + # command + pass + + elif (doc['op'] == 'n'): + # no-op + pass + + else: + print(doc) + sys.exit(1) + + if ((numCurrentBulkOps >= appConfig["maxOperationsPerBatch"]) or (time.time() >= (lastBatch + appConfig["maxSecondsBetweenBatches"]))) and (numCurrentBulkOps > 0): + if not appConfig['dryRun']: + try: + result = destCollection.bulk_write(bulkOpList,ordered=True) + except: + # replace inserts as replaces + result = destCollection.bulk_write(bulkOpListReplace,ordered=True) + perfQ.put({"name":"batchCompleted","operations":numCurrentBulkOps,"endts":endTs,"processNum":threadnum}) + bulkOpList = [] + bulkOpListReplace = [] + numCurrentBulkOps = 0 + numTotalBatches += 1 + lastBatch = time.time() + + if ((numCurrentBulkOps >= appConfig["maxOperationsPerBatch"]) or (time.time() >= (lastBatch + appConfig["maxSecondsBetweenBatches"]))) and (numCurrentBulkOps > 0): + if not appConfig['dryRun']: + try: + result = destCollection.bulk_write(bulkOpList,ordered=True) + except: + # replace inserts as replaces + result = destCollection.bulk_write(bulkOpListReplace,ordered=True) + perfQ.put({"name":"batchCompleted","operations":numCurrentBulkOps,"endts":endTs,"processNum":threadnum}) + bulkOpList = [] + bulkOpListReplace = [] + numCurrentBulkOps = 0 + numTotalBatches += 1 + lastBatch = time.time() + + # nothing arrived in the oplog for 1 second, pause before trying again + time.sleep(1) + + if (numCurrentBulkOps > 0): + if not appConfig['dryRun']: + try: + result = destCollection.bulk_write(bulkOpList,ordered=True) + except: + # replace inserts as replaces + result = destCollection.bulk_write(bulkOpListReplace,ordered=True) + perfQ.put({"name":"batchCompleted","operations":numCurrentBulkOps,"endts":endTs,"processNum":threadnum}) + bulkOpList = [] + bulkOpListReplace = [] + numCurrentBulkOps = 0 + numTotalBatches += 1 + + c.close() + destConnection.close() + + perfQ.put({"name":"processCompleted","processNum":threadnum}) + + +def change_stream_processor(threadnum, appConfig, perfQ): + warnings.filterwarnings("ignore","You appear to be connected to a DocumentDB cluster.") + + if appConfig['verboseLogging']: + logIt(threadnum,'thread started') + + sourceConnection = pymongo.MongoClient(host=appConfig["sourceUri"],appname='migrcdc') + sourceDb = sourceConnection[appConfig["sourceNs"].split('.',1)[0]] + sourceColl = sourceDb[appConfig["sourceNs"].split('.',1)[1]] + + destConnection = pymongo.MongoClient(host=appConfig["targetUri"],appname='migrcdc') + destDatabase = destConnection[appConfig["targetNs"].split('.',1)[0]] + destCollection = destDatabase[appConfig["targetNs"].split('.',1)[1]] + + startTime = time.time() + lastFeedback = time.time() + lastBatch = time.time() + + allDone = False + threadOplogEntries = 0 + perfReportInterval = 1 + nextPerfReportTime = time.time() + perfReportInterval + + bulkOpList = [] + + # list with replace, not insert, in case document already exists (replaying old oplog) + bulkOpListReplace = [] + numCurrentBulkOps = 0 + numReportBulkOps = 0 + + numTotalBatches = 0 + + printedFirstTs = False + myCollectionOps = 0 + + # starting timestamp + endTs = appConfig["startTs"] + + if (appConfig["startTs"] == "RESUME_TOKEN"): + stream = sourceColl.watch(resume_after={'_data': appConfig["startPosition"]}, full_document='updateLookup', pipeline=[{'$match': {'operationType': {'$in': ['insert','update','replace','delete']}}},{'$project':{'updateDescription':0}}]) + else: + stream = sourceColl.watch(start_at_operation_time=endTs, full_document='updateLookup', pipeline=[{'$match': {'operationType': {'$in': ['insert','update','replace','delete']}}},{'$project':{'updateDescription':0}}]) + + if appConfig['verboseLogging']: + if (appConfig["startTs"] == "RESUME_TOKEN"): + logIt(threadnum,"Creating change stream cursor for resume token {}".format(appConfig["startPosition"])) + else: + logIt(threadnum,"Creating change stream cursor for timestamp {}".format(endTs.as_datetime())) + + while not allDone: + for change in stream: + # check if time to exit + if ((time.time() - startTime) > appConfig['durationSeconds']) and (appConfig['durationSeconds'] != 0): + allDone = True + break + + endTs = change['clusterTime'] + resumeToken = change['_id']['_data'] + thisNs = change['ns']['db']+'.'+change['ns']['coll'] + thisOp = change['operationType'] + + # NOTE: Python's non-deterministic hash() cannot be used as it is seeded at startup, since this code is multiprocessing we need all hash calls to be the same between processes + # hash(str(doc['o']['_id'])) + #if ((thisOp in ['insert','update','replace','delete']) and + # (thisNs == appConfig["sourceNs"]) and + if ((int(hashlib.sha512(str(change['documentKey']).encode('utf-8')).hexdigest(), 16) % appConfig["numProcessingThreads"]) == threadnum): + # this is for my thread + + threadOplogEntries += 1 + + if (not printedFirstTs) and (thisOp in ['insert','update','replace','delete']) and (thisNs == appConfig["sourceNs"]): + if appConfig['verboseLogging']: + logIt(threadnum,'first timestamp = {} aka {}'.format(change['clusterTime'],change['clusterTime'].as_datetime())) + printedFirstTs = True + + if (thisOp == 'insert'): + # insert + if (thisNs == appConfig["sourceNs"]): + myCollectionOps += 1 + bulkOpList.append(pymongo.InsertOne(change['fullDocument'])) + # if playing old oplog, need to change inserts to be replaces (the inserts will fail due to _id uniqueness) + #bulkOpListReplace.append(pymongo.ReplaceOne({'_id':change['documentKey']},change['fullDocument'],upsert=True)) + bulkOpListReplace.append(pymongo.ReplaceOne(change['documentKey'],change['fullDocument'],upsert=True)) + numCurrentBulkOps += 1 + else: + pass + + elif (thisOp in ['update','replace']): + # update/replace + if (change['fullDocument'] is not None): + if (thisNs == appConfig["sourceNs"]): + myCollectionOps += 1 + #bulkOpList.append(pymongo.ReplaceOne({'_id':change['documentKey']},change['fullDocument'],upsert=True)) + bulkOpList.append(pymongo.ReplaceOne(change['documentKey'],change['fullDocument'],upsert=True)) + # if playing old oplog, need to change inserts to be replaces (the inserts will fail due to _id uniqueness) + #bulkOpListReplace.append(pymongo.ReplaceOne({'_id':change['documentKey']},change['fullDocument'],upsert=True)) + bulkOpListReplace.append(pymongo.ReplaceOne(change['documentKey'],change['fullDocument'],upsert=True)) + numCurrentBulkOps += 1 + else: + pass + + elif (thisOp == 'delete'): + # delete + if (thisNs == appConfig["sourceNs"]): + myCollectionOps += 1 + bulkOpList.append(pymongo.DeleteOne({'_id':change['documentKey']['_id']})) + # if playing old oplog, need to change inserts to be replaces (the inserts will fail due to _id uniqueness) + bulkOpListReplace.append(pymongo.DeleteOne({'_id':change['documentKey']['_id']})) + numCurrentBulkOps += 1 + else: + pass + + elif (thisOp in ['drop','rename','dropDatabase','invalidate']): + # operations we do not track + pass + + else: + print(change) + sys.exit(1) + + if time.time() > nextPerfReportTime: + nextPerfReportTime = time.time() + perfReportInterval + perfQ.put({"name":"batchCompleted","operations":numReportBulkOps,"endts":endTs,"processNum":threadnum,"resumeToken":resumeToken}) + numReportBulkOps = 0 + + if ((numCurrentBulkOps >= appConfig["maxOperationsPerBatch"]) or (time.time() >= (lastBatch + appConfig["maxSecondsBetweenBatches"]))) and (numCurrentBulkOps > 0): + if not appConfig['dryRun']: + try: + result = destCollection.bulk_write(bulkOpList,ordered=True) + except: + # replace inserts as replaces + result = destCollection.bulk_write(bulkOpListReplace,ordered=True) + + bulkOpList = [] + bulkOpListReplace = [] + numReportBulkOps += numCurrentBulkOps + numCurrentBulkOps = 0 + numTotalBatches += 1 + lastBatch = time.time() + + # nothing arrived in the oplog for 1 second, pause before trying again + #time.sleep(1) + + if (numCurrentBulkOps > 0): + if not appConfig['dryRun']: + try: + result = destCollection.bulk_write(bulkOpList,ordered=True) + except: + # replace inserts as replaces + result = destCollection.bulk_write(bulkOpListReplace,ordered=True) + perfQ.put({"name":"batchCompleted","operations":numCurrentBulkOps,"endts":endTs,"processNum":threadnum,"resumeToken":resumeToken}) + bulkOpList = [] + bulkOpListReplace = [] + numCurrentBulkOps = 0 + numTotalBatches += 1 + + sourceConnection.close() + destConnection.close() + + perfQ.put({"name":"processCompleted","processNum":threadnum}) + + +def get_resume_token(appConfig): + warnings.filterwarnings("ignore","You appear to be connected to a DocumentDB cluster.") + + logIt(-1,'getting current change stream resume token') + + sourceConnection = pymongo.MongoClient(host=appConfig["sourceUri"],appname='migrcdc') + sourceDb = sourceConnection[appConfig["sourceNs"].split('.',1)[0]] + sourceColl = sourceDb[appConfig["sourceNs"].split('.',1)[1]] + + allDone = False + + stream = sourceColl.watch() + + while not allDone: + for change in stream: + resumeToken = change['_id']['_data'] + logIt(-1,'Change stream resume token is {}'.format(resumeToken)) + allDone = True + break + + +def reporter(appConfig, perfQ): + createCloudwatchMetrics = appConfig['createCloudwatchMetrics'] + clusterName = appConfig['clusterName'] + + if appConfig['verboseLogging']: + logIt(-1,'reporting thread started') + + if createCloudwatchMetrics: + # only instantiate client if needed + cloudWatchClient = boto3.client('cloudwatch') + + startTime = time.time() + lastTime = time.time() + + # number of seconds between posting metrics to cloudwatch + cloudwatchPutSeconds = 30 + lastCloudwatchPutTime = time.time() + + lastProcessedOplogEntries = 0 + nextReportTime = startTime + appConfig["feedbackSeconds"] + + resumeToken = 'N/A' + + numWorkersCompleted = 0 + numProcessedOplogEntries = 0 + + dtDict = {} + + while (numWorkersCompleted < appConfig["numProcessingThreads"]): + time.sleep(appConfig["feedbackSeconds"]) + nowTime = time.time() + + numBatchEntries = 0 + while not perfQ.empty(): + qMessage = perfQ.get_nowait() + if qMessage['name'] == "batchCompleted": + numBatchEntries += 1 + numProcessedOplogEntries += qMessage['operations'] + thisEndDt = qMessage['endts'].as_datetime().replace(tzinfo=None) + thisProcessNum = qMessage['processNum'] + if (thisProcessNum in dtDict) and (thisEndDt > dtDict[thisProcessNum]): + dtDict[thisProcessNum] = thisEndDt + else: + dtDict[thisProcessNum] = thisEndDt + #print("received endTs = {}".format(thisEndTs.as_datetime())) + if 'resumeToken' in qMessage: + resumeToken = qMessage['resumeToken'] + else: + resumeToken = 'N/A' + + elif qMessage['name'] == "processCompleted": + numWorkersCompleted += 1 + + # total total + elapsedSeconds = nowTime - startTime + totalOpsPerSecond = int(numProcessedOplogEntries / elapsedSeconds) + + # elapsed hours, minutes, seconds + thisHours, rem = divmod(elapsedSeconds, 3600) + thisMinutes, thisSeconds = divmod(rem, 60) + thisHMS = "{:0>2}:{:0>2}:{:05.2f}".format(int(thisHours),int(thisMinutes),thisSeconds) + + # this interval + intervalElapsedSeconds = nowTime - lastTime + intervalOpsPerSecond = int((numProcessedOplogEntries - lastProcessedOplogEntries) / intervalElapsedSeconds) + + # how far behind current time + if numBatchEntries == 0: + # no work this interval, we are fully caught up + avgSecondsBehind = 0 + else: + dtUtcNow = datetime.utcnow() + totSecondsBehind = 0 + numSecondsBehindEntries = 0 + for thisDt in dtDict: + totSecondsBehind += (dtUtcNow - dtDict[thisDt].replace(tzinfo=None)).total_seconds() + numSecondsBehindEntries += 1 + + avgSecondsBehind = int(totSecondsBehind / max(numSecondsBehindEntries,1)) + + logTimeStamp = datetime.utcnow().isoformat()[:-3] + 'Z' + print("[{0}] elapsed {1} | total o/s {2:9,d} | interval o/s {3:9,d} | tot {4:16,d} | {5:12,d} secs behind | resume token = {6}".format(logTimeStamp,thisHMS,totalOpsPerSecond,intervalOpsPerSecond,numProcessedOplogEntries,avgSecondsBehind,resumeToken)) + nextReportTime = nowTime + appConfig["feedbackSeconds"] + + lastTime = nowTime + lastProcessedOplogEntries = numProcessedOplogEntries + + # output CW metrics every cloudwatchPutSeconds seconds + if createCloudwatchMetrics and ((time.time() - lastCloudwatchPutTime) > cloudwatchPutSeconds): + # log to cloudwatch + cloudWatchClient.put_metric_data( + Namespace='CustomDocDB', + MetricData=[{'MetricName':'MigratorCDCOperationsPerSecond','Dimensions':[{'Name':'Cluster','Value':clusterName}],'Value':intervalOpsPerSecond,'StorageResolution':60}, + {'MetricName':'MigratorCDCNumSecondsBehind','Dimensions':[{'Name':'Cluster','Value':clusterName}],'Value':avgSecondsBehind,'StorageResolution':60}]) + + lastCloudwatchPutTime = time.time() + + +def main(): + warnings.filterwarnings("ignore","You appear to be connected to a DocumentDB cluster.") + + parser = argparse.ArgumentParser(description='CDC replication tool.') + + parser.add_argument('--skip-python-version-check', + required=False, + action='store_true', + help='Permit execution on Python 3.6 and prior') + + parser.add_argument('--source-uri', + required=True, + type=str, + help='Source URI') + + parser.add_argument('--target-uri', + required=True, + type=str, + help='Target URI') + + parser.add_argument('--source-namespace', + required=True, + type=str, + help='Source Namespace as .') + + parser.add_argument('--target-namespace', + required=False, + type=str, + help='Target Namespace as ., defaults to --source-namespace') + + parser.add_argument('--duration-seconds', + required=False, + type=int, + default=0, + help='Number of seconds to run before exiting, 0 = run forever') + + parser.add_argument('--feedback-seconds', + required=False, + type=int, + default=60, + help='Number of seconds between feedback output') + + parser.add_argument('--threads', + required=False, + type=int, + default=1, + help='Number of threads (parallel processing)') + + parser.add_argument('--max-seconds-between-batches', + required=False, + type=int, + default=5, + help='Maximum number of seconds to await full batch') + + parser.add_argument('--max-operations-per-batch', + required=False, + type=int, + default=100, + help='Maximum number of operations to include in a single batch') + + parser.add_argument('--dry-run', + required=False, + action='store_true', + help='Read source changes only, do not apply to target') + + parser.add_argument('--start-position', + required=True, + type=str, + help='Starting position - 0 for all available changes, YYYY-MM-DD+HH:MM:SS in UTC, or change stream resume token') + + parser.add_argument('--verbose', + required=False, + action='store_true', + help='Enable verbose logging') + + parser.add_argument('--use-oplog', + required=False, + action='store_true', + help='Use the oplog as change data capture source') + + parser.add_argument('--use-change-stream', + required=False, + action='store_true', + help='Use change streams as change data capture source') + + parser.add_argument('--get-resume-token', + required=False, + action='store_true', + help='Display the current change stream resume token') + + parser.add_argument('--create-cloudwatch-metrics',required=False,action='store_true',help='Create CloudWatch metrics when garbage collection is active') + parser.add_argument('--cluster-name',required=False,type=str,help='Name of cluster for CloudWatch metrics') + + args = parser.parse_args() + + MIN_PYTHON = (3, 7) + if (not args.skip_python_version_check) and (sys.version_info < MIN_PYTHON): + sys.exit("\nPython %s.%s or later is required.\n" % MIN_PYTHON) + + if (not args.use_oplog) and (not args.use_change_stream): + message = "Must supply either --use-oplog or --use-change-stream" + parser.error(message) + + if (args.use_oplog) and (args.use_change_stream): + message = "Cannot supply both --use-oplog or --use-change-stream" + parser.error(message) + + if (args.use_change_stream) and (args.start_position == "0"): + message = "--start-position must be supplied as YYYY-MM-DD+HH:MM:SS in UTC or resume token when executing in --use-change-stream mode" + parser.error(message) + + if args.create_cloudwatch_metrics and (args.cluster_name is None): + sys.exit("\nMust supply --cluster-name when capturing CloudWatch metrics.\n") + + appConfig = {} + appConfig['sourceUri'] = args.source_uri + appConfig['targetUri'] = args.target_uri + appConfig['numProcessingThreads'] = args.threads + appConfig['maxSecondsBetweenBatches'] = args.max_seconds_between_batches + appConfig['maxOperationsPerBatch'] = args.max_operations_per_batch + appConfig['durationSeconds'] = args.duration_seconds + appConfig['feedbackSeconds'] = args.feedback_seconds + appConfig['dryRun'] = args.dry_run + appConfig['sourceNs'] = args.source_namespace + if not args.target_namespace: + appConfig['targetNs'] = args.source_namespace + else: + appConfig['targetNs'] = args.target_namespace + appConfig['startPosition'] = args.start_position + appConfig['verboseLogging'] = args.verbose + appConfig['createCloudwatchMetrics'] = args.create_cloudwatch_metrics + appConfig['clusterName'] = args.cluster_name + + if args.get_resume_token: + get_resume_token(appConfig) + sys.exit(0) + + if args.use_oplog: + appConfig['cdcSource'] = 'oplog' + else: + appConfig['cdcSource'] = 'changeStream' + + logIt(-1,"processing {} using {} threads".format(appConfig['cdcSource'],appConfig['numProcessingThreads'])) + + if len(appConfig["startPosition"]) == 36: + # resume token + appConfig["startTs"] = "RESUME_TOKEN" + + logIt(-1,"starting with resume token = {}".format(appConfig["startPosition"])) + + else: + if appConfig["startPosition"] == "0": + # start with first oplog entry + c = pymongo.MongoClient(host=appConfig["sourceUri"],appname='migrcdc') + oplog = c.local.oplog.rs + first = oplog.find().sort('$natural', pymongo.ASCENDING).limit(1).next() + appConfig["startTs"] = first['ts'] + c.close() + elif appConfig["startPosition"].upper() == "NOW": + # start with current time + appConfig["startTs"] = Timestamp(datetime.utcnow(), 1) + else: + # start at an arbitrary position + appConfig["startTs"] = Timestamp(datetime.fromisoformat(args.start_position), 1) + + logIt(-1,"starting with timestamp = {}".format(appConfig["startTs"].as_datetime())) + + mp.set_start_method('spawn') + q = mp.Manager().Queue() + + t = threading.Thread(target=reporter,args=(appConfig,q)) + t.start() + + processList = [] + for loop in range(appConfig["numProcessingThreads"]): + if (appConfig['cdcSource'] == 'oplog'): + p = mp.Process(target=oplog_processor,args=(loop,appConfig,q)) + else: + p = mp.Process(target=change_stream_processor,args=(loop,appConfig,q)) + processList.append(p) + + for process in processList: + process.start() + + for process in processList: + process.join() + + t.join() + + +if __name__ == "__main__": + main() diff --git a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/documentdb_index_tool.py b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/documentdb_index_tool.py new file mode 100644 index 0000000..36ed6f4 --- /dev/null +++ b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/documentdb_index_tool.py @@ -0,0 +1,643 @@ +#!/bin/env python +""" + Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + A copy of the License is located at + + http://www.apache.org/licenses/LICENSE-2.0 + + or in the "license" file accompanying this file. This file is distributed + on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + express or implied. See the License for the specific language governing + permissions and limitations under the License. +""" + +import argparse +import errno +import json +import logging +import os +import sys +import string +import random + +from bson.json_util import dumps +from pymongo import MongoClient +from pymongo.errors import (ConnectionFailure, OperationFailure, ServerSelectionTimeoutError) +from collections import OrderedDict + +alphabet = string.ascii_lowercase + string.digits + +class AutovivifyDict(dict): + """N depth defaultdict.""" + + def __getitem__(self, item): + try: + return dict.__getitem__(self, item) + except KeyError: + value = self[item] = type(self)() + return value + + +class DocumentDbLimits(object): + """ + DocumentDB limits + """ + + def __init__(self): + pass + + COLLECTION_QUALIFIED_INDEX_NAME_MAX_LENGTH = 255 + COLLECTION_NAME_MAX_LENGTH = 57 + COMPOUND_INDEX_MAX_KEYS = 32 + DATABASE_NAME_MAX_LENGTH = 63 + FULLY_QUALIFIED_INDEX_NAME_MAX_LENGTH = 377 + INDEX_KEY_MAX_LENGTH = 2048 + NAMESPACE_MAX_LENGTH = 120 + + +class DocumentDbUnsupportedFeatures(object): + """ + List of unsupported features in DocumentDB + """ + + def __init__(self): + pass + + UNSUPPORTED_INDEX_TYPES = ['2d', '2dsphere', 'geoHaystack', 'hashed'] + UNSUPPORTED_INDEX_OPTIONS = ['storageEngine', 'collation', 'dropDuplicates'] + UNSUPPORTED_COLLECTION_OPTIONS = ['capped'] + IGNORED_INDEX_OPTIONS = ['2dsphereIndexVersion'] + + +class IndexToolConstants(object): + """ + constants used in this tool + """ + + def __init__(self): + pass + + DATABASES_TO_SKIP = ['admin', 'config', 'local', 'system'] + SYSTEM_OBJECTS_TO_SKIP = ['system.buckets','system.namespaces','system.indexes','system.profile','system.js','system.views'] + METADATA_FILES_TO_SKIP = ['system.indexes.metadata.json', 'system.profile.metadata.json', 'system.users.metadata.json', 'system.views.metadata.json'] + METADATA_FILE_SUFFIX_PATTERN = 'metadata.json' + EXCEEDED_LIMITS = 'exceeded_limits' + FILE_PATH = 'filepath' + ID = '_id_' + INDEXES = 'indexes' + INDEX_DEFINITION = 'definition' + INDEX_NAME = 'name' + INDEX_VERSION = 'v' + INDEX_KEY = 'key' + INDEX_NAMESPACE = 'ns' + NAMESPACE = 'ns' + OPTIONS = 'options' + UNSUPPORTED_INDEX_OPTIONS_KEY = 'unsupported_index_options' + UNSUPPORTED_COLLECTION_OPTIONS_KEY = 'unsupported_collection_options' + UNSUPPORTED_INDEX_TYPES_KEY = 'unsupported_index_types' + + +class DocumentDbIndexTool(IndexToolConstants): + """ + Traverses a mongodump directory structure performs discovery and index restore functions. + """ + + def __init__(self, args): + super(DocumentDbIndexTool, self).__init__() + self.args = args + + log_level = logging.INFO + + if self.args.debug is True: + log_level = logging.DEBUG + + root_logger = logging.getLogger() + root_logger.setLevel(log_level) + + root_handler = logging.StreamHandler(sys.stdout) + root_handler.setLevel(log_level) + formatter = logging.Formatter('%(asctime)s: %(message)s') + root_handler.setFormatter(formatter) + root_logger.addHandler(root_handler) + + def _mkdir_p(self, filepath): + try: + os.makedirs(filepath) + except OSError as ose: + if ose.errno == errno.EEXIST and os.path.isdir(filepath): + pass + else: + raise + + def _get_db_connection(self, uri): + """Connect to instance, returning a connection""" + logging.info("Connecting to instance using provided URI") + + mongodb_client = MongoClient(host=uri,appname='indxtool') + + # force the client to actually connect + mongodb_client.admin.command('ismaster') + + logging.info(" .. successfully connected") + + return mongodb_client + + def _get_compatible_metadata(self, metadata, compatibility_issues): + compatible_metadata = metadata.copy() + for db_name in compatibility_issues: + if self.EXCEEDED_LIMITS in compatibility_issues[db_name]: + del compatible_metadata[db_name] + continue + + for collection_name in compatibility_issues[db_name]: + if self.UNSUPPORTED_COLLECTION_OPTIONS_KEY in compatibility_issues[ + db_name][collection_name]: + del compatible_metadata[db_name][collection_name] + continue + if self.EXCEEDED_LIMITS in compatibility_issues[db_name][ + collection_name]: + del compatible_metadata[db_name][collection_name] + continue + for index_name in compatibility_issues[db_name][ + collection_name]: + del compatible_metadata[db_name][collection_name][ + self.INDEXES][index_name] + + return metadata + + def _get_metadata_from_file(self, filepath): + """ + Given a path to a metadata file, return the JSON data structure parsed + from the contents, formatted. + """ + with open(filepath, 'rt') as metadata_file: + logging.debug("Getting metadata from file: %s", filepath) + + file_metadata = json.load(metadata_file, object_pairs_hook=OrderedDict) + collection_metadata = OrderedDict() + indexes = file_metadata.get(self.INDEXES, None) + + # every collection should have at least the _id_ index. If no indexes are listed, the + # metadata document is malformed and we should error out + if indexes is None: + raise Exception( + "Malformed metadata document {} has no indexes.".format( + filepath)) + + if (len(indexes) == 0): + # no indexes for this collection + db_name = os.path.basename(os.path.dirname(filepath)) + thisFileName = os.path.basename(filepath) + collection_name = thisFileName[0:(len(thisFileName)-len(self.METADATA_FILE_SUFFIX_PATTERN)-1)] + else: + first_index = indexes[0] + if self.NAMESPACE in first_index: + first_index_namespace = first_index[self.NAMESPACE] + (db_name, collection_name) = first_index_namespace.split('.', 1) + else: + db_name = os.path.basename(os.path.dirname(filepath)) + thisFileName = os.path.basename(filepath) + collection_name = thisFileName[0:(len(thisFileName)-len(self.METADATA_FILE_SUFFIX_PATTERN)-1)] + + collection_metadata[self.FILE_PATH] = filepath + + for index in indexes: + index_name = index.pop(self.INDEX_NAME) + if self.INDEXES not in collection_metadata: + collection_metadata[self.INDEXES] = OrderedDict() + collection_metadata[self.INDEXES][index_name] = index + + if self.OPTIONS in file_metadata: + collection_metadata[self.OPTIONS] = file_metadata[ + self.OPTIONS] + + return db_name, collection_name, collection_metadata + + def _find_metadata_files(self, start_dir): + """Recurse through subdirectories looking for metadata files""" + metadata_files = [] + + for (dirpath, dirnames, files) in os.walk(start_dir): + for filename in files: + if (filename.endswith(self.METADATA_FILE_SUFFIX_PATTERN) and filename not in self.METADATA_FILES_TO_SKIP): + metadata_files.append(os.path.join(dirpath, filename)) + + return metadata_files + + def _dump_indexes_from_server(self, connection, output_dir, dry_run=False): + """ + Discover all indexes in a mongodb server and dump them + to files using the mongodump format + """ + + logging.info("Retrieving indexes from server...") + try: + database_info = connection.admin.command({'listDatabases': 1}) + + for database_doc in database_info['databases']: + database_name = database_doc['name'] + logging.debug("Database: %s", database_name) + + if database_name in self.DATABASES_TO_SKIP: + continue + + database_path = os.path.join(output_dir, database_name) + + if dry_run is not True: + self._mkdir_p(database_path) + + # Write out each collection's stats in this database + for collection_name in connection[ + database_name].list_collection_names(): + logging.debug("Collection: %s", collection_name) + collection_metadata = {} + collection_metadata[self.OPTIONS] = connection[ + database_name][collection_name].options() + if 'viewOn' in collection_metadata[self.OPTIONS]: + # views cannot have indexes, skip to next collection + logging.debug(" skipping, view not collection") + continue + if collection_name in self.SYSTEM_OBJECTS_TO_SKIP: + # system objects, skip to next collection + logging.debug(" skipping, system object") + continue + collection_indexes = connection[database_name][ + collection_name].list_indexes() + thisIndexes = [] + for thisIndex in collection_indexes: + if "ns" not in thisIndex: + # mdb44+ eliminated the "ns" attribute + thisIndex["ns"] = "{}.{}".format(database_name,collection_name) + thisIndexes.append(thisIndex) + collection_metadata[self.INDEXES] = thisIndexes + + collection_metadata_filename = "{}.{}".format( + collection_name, self.METADATA_FILE_SUFFIX_PATTERN) + collection_metadata_filepath = os.path.join( + database_path, collection_metadata_filename) + + if dry_run is True: + logging.info("\n%s.%s\n%s", + database_name, collection_name, + dumps(collection_metadata)) + + else: + logging.debug( + "Writing collection metadata for collection: %s", + collection_name) + with open(collection_metadata_filepath, + 'wt') as collection_metadata_file: + collection_metadata_file.write( + dumps(collection_metadata, + separators=(',', ':'))) + + logging.info( + "Completed writing index metadata to local folder: %s", + output_dir) + + except Exception: + logging.exception("Failed to dump indexes from server") + sys.exit() + + def get_metadata(self, start_path): + """ + Recursively search the supplied start_path, discovering all JSON metadata files and adding the + information to our metadata data structure. + """ + try: + logging.debug( + "Beginning recursive discovery of metadata files, starting at %s", + start_path) + metadata_files = self._find_metadata_files(start_path) + + if metadata_files == []: + logging.error("No metadata files found beneath directory: %s", + start_path) + sys.exit() + + logging.debug("Metadata files found: {}%s", metadata_files) + + metadata = AutovivifyDict() + + for filepath in metadata_files: + (db_name, collection_name, + collection_metadata) = self._get_metadata_from_file(filepath) + metadata[db_name][collection_name] = collection_metadata + + return metadata + + except Exception: + logging.exception("Failed to discover dump indexes") + sys.exit() + + def find_compatibility_issues(self, metadata): + """Check db, collection and index data in metadata files for compatibility with DocumentDB""" + compatibility_issues = AutovivifyDict() + + for db_name in metadata: + db_metadata = metadata[db_name] + + if len(db_name) > DocumentDbLimits.DATABASE_NAME_MAX_LENGTH: + message = 'Database name greater than {} characters'.format( + DocumentDbLimits.DATABASE_NAME_MAX_LENGTH) + compatibility_issues[db_name][ + self.EXCEEDED_LIMITS][message] = db_name + + for collection_name in metadata[db_name]: + collection_metadata = db_metadata[collection_name] + + if len(collection_name + ) > DocumentDbLimits.COLLECTION_NAME_MAX_LENGTH: + message = 'Collection name greater than {} characters'.format( + DocumentDbLimits.COLLECTION_NAME_MAX_LENGTH) + compatibility_issues[db_name][collection_name][ + self.EXCEEDED_LIMITS][message] = collection_name + + collection_namespace = '{}.{}'.format(db_name, collection_name) + # . + if len(collection_namespace + ) > DocumentDbLimits.NAMESPACE_MAX_LENGTH: + message = 'Namespace greater than {} characters'.format( + DocumentDbLimits.NAMESPACE_MAX_LENGTH) + compatibility_issues[db_name][collection_name][ + self.EXCEEDED_LIMITS][message] = collection_namespace + + if self.OPTIONS in collection_metadata: + for option_key in collection_metadata[self.OPTIONS]: + if option_key in DocumentDbUnsupportedFeatures.UNSUPPORTED_COLLECTION_OPTIONS and collection_metadata[self.OPTIONS][option_key] is True: + if self.UNSUPPORTED_COLLECTION_OPTIONS_KEY not in compatibility_issues[ + db_name][collection_name]: + compatibility_issues[db_name][collection_name][ + self. + UNSUPPORTED_COLLECTION_OPTIONS_KEY] = [] + + compatibility_issues[db_name][collection_name][ + self. + UNSUPPORTED_COLLECTION_OPTIONS_KEY].append( + option_key) + + for index_name in collection_metadata[self.INDEXES]: + index = collection_metadata[self.INDEXES][index_name] + + # $ + collection_qualified_index_name = '{}${}'.format( + collection_name, index_name) + if len( + collection_qualified_index_name + ) > DocumentDbLimits.COLLECTION_QUALIFIED_INDEX_NAME_MAX_LENGTH and self.args.shorten_index_name is False: + message = '$ greater than {} characters'.format( + DocumentDbLimits. + COLLECTION_QUALIFIED_INDEX_NAME_MAX_LENGTH) + compatibility_issues[db_name][collection_name][ + index_name][self.EXCEEDED_LIMITS][ + message] = collection_qualified_index_name + + # .$ + fully_qualified_index_name = '{}${}'.format( + collection_namespace, index_name) + if len( + fully_qualified_index_name + ) > DocumentDbLimits.FULLY_QUALIFIED_INDEX_NAME_MAX_LENGTH and self.args.shorten_index_name is False: + message = '.$ greater than {} characters'.format( + DocumentDbLimits. + FULLY_QUALIFIED_INDEX_NAME_MAX_LENGTH) + compatibility_issues[db_name][collection_name][ + index_name][self.EXCEEDED_LIMITS][ + message] = fully_qualified_index_name + + # Check for indexes with too many keys + if len(index) > DocumentDbLimits.COMPOUND_INDEX_MAX_KEYS: + message = 'Index contains more than {} keys'.format( + DocumentDbLimits.COMPOUND_INDEX_MAX_KEYS) + compatibility_issues[db_name][collection_name][ + index_name][self.EXCEEDED_LIMITS][message] = len( + index) + + for key_name in index: + # Check for index key names that are too long + if len(key_name + ) > DocumentDbLimits.INDEX_KEY_MAX_LENGTH: + message = 'Key name greater than {} characters'.format( + DocumentDbLimits.INDEX_KEY_MAX_LENGTH) + compatibility_issues[db_name][collection_name][ + index_name][ + self.EXCEEDED_LIMITS][message] = key_name + + # Check for unsupported index options like collation + if key_name in DocumentDbUnsupportedFeatures.UNSUPPORTED_INDEX_OPTIONS: + if self.UNSUPPORTED_INDEX_OPTIONS_KEY not in compatibility_issues[ + db_name][collection_name][index_name]: + compatibility_issues[db_name][collection_name][ + index_name][ + self. + UNSUPPORTED_INDEX_OPTIONS_KEY] = [] + + compatibility_issues[db_name][collection_name][ + index_name][ + self.UNSUPPORTED_INDEX_OPTIONS_KEY].append( + key_name) + + # Check for unsupported index types + if key_name == self.INDEX_KEY: + for index_key_name in index[key_name]: + key_value = index[key_name][index_key_name] + + if key_value in DocumentDbUnsupportedFeatures.UNSUPPORTED_INDEX_TYPES: + compatibility_issues[db_name][ + collection_name][index_name][ + self. + UNSUPPORTED_INDEX_TYPES_KEY] = key_value + + return compatibility_issues + + def _restore_indexes(self, connection, metadata): + """Restore compatible indexes to a DocumentDB instance""" + for db_name in metadata: + for collection_name in metadata[db_name]: + for index_name in metadata[db_name][collection_name][ + self.INDEXES]: + # convert the keys dict to a list of tuples as pymongo requires + index_keys = metadata[db_name][collection_name][ + self.INDEXES][index_name][self.INDEX_KEY] + keys_to_create = [] + index_options = OrderedDict() + + # $ + collection_qualified_index_name = '{}${}'.format(collection_name, index_name) + # ..$ + fully_qualified_index_name = '{}.{}.${}'.format(db_name, collection_name, index_name) + + if (len(collection_qualified_index_name) > DocumentDbLimits.COLLECTION_QUALIFIED_INDEX_NAME_MAX_LENGTH or + len(fully_qualified_index_name) > DocumentDbLimits.FULLY_QUALIFIED_INDEX_NAME_MAX_LENGTH): + short_index_name = index_name[:(DocumentDbLimits.COLLECTION_QUALIFIED_INDEX_NAME_MAX_LENGTH - + (len(collection_name)+6))] +''.join(random.choices(alphabet, k=5)) + index_options[self.INDEX_NAME] = short_index_name + else: + index_options[self.INDEX_NAME] = index_name + + for key in index_keys: + index_direction = index_keys[key] + + if type(index_direction) is float: + index_direction = int(index_direction) + elif type(index_direction) is dict and '$numberInt' in index_direction: + index_direction = int(index_direction['$numberInt']) + elif type(index_direction) is dict and '$numberDouble' in index_direction: + index_direction = int(float(index_direction['$numberDouble'])) + + keys_to_create.append((key, index_direction)) + + for k in metadata[db_name][collection_name][ + self.INDEXES][index_name]: + if k != self.INDEX_KEY and k != self.INDEX_VERSION and k not in DocumentDbUnsupportedFeatures.IGNORED_INDEX_OPTIONS: + # this key is an additional index option + index_options[k] = metadata[db_name][ + collection_name][self.INDEXES][index_name][k] + + if self.args.dry_run is True: + if self.args.skip_id_indexes and index_options[self.INDEX_NAME] == '_id_': + logging.info("(dry run) skipping _id index creation on %s.%s",db_name,collection_name) + else: + logging.info( + "(dry run) %s.%s: would attempt to add index: %s", + db_name, collection_name, index_options[self.INDEX_NAME] ) + logging.info(" (dry run) index options: %s", index_options) + logging.info(" (dry run) index keys: %s", keys_to_create) + else: + if self.args.skip_id_indexes and index_options[self.INDEX_NAME] == '_id_': + logging.info("Skipping _id index creation on %s.%s",db_name,collection_name) + else: + logging.debug("Adding index %s -> %s", keys_to_create, + index_options) + database = connection[db_name] + collection = database[collection_name] + collection.create_index(keys_to_create, + **index_options) + logging.info("%s.%s: added index: %s", db_name, + collection_name, index_options[self.INDEX_NAME] ) + + def run(self): + """Entry point + """ + metadata = None + compatibility_issues = None + connection = None + + # get a connection to our source mongodb or destination DocumentDb + if self.args.dump_indexes is True or self.args.restore_indexes is True: + try: + connection = self._get_db_connection(self.args.uri) + except (ConnectionFailure, ServerSelectionTimeoutError, + OperationFailure) as cex: + logging.error("Connection to instance failed: %s", cex) + sys.exit() + + # dump indexes from a MongoDB server + if self.args.dump_indexes is True: + self._dump_indexes_from_server(connection, self.args.dir, + self.args.dry_run) + sys.exit() + + # all non-dump operations require valid source metadata + try: + metadata = self.get_metadata(self.args.dir) + compatibility_issues = self.find_compatibility_issues(metadata) + except Exception as ex: + logging.error("Failed to load collection metadata: %s", ex) + sys.exit() + + # Apply indexes to a DocumentDB instance + if self.args.restore_indexes is True: + metadata_to_restore = metadata + + if self.args.skip_incompatible is not True: + if compatibility_issues: + logging.error( + "incompatible indexes exist and --skip-incompatible not specified." + ) + logging.error( + "force support of 2dsphere indexes by including --support-2dsphere" + ) + sys.exit() + else: + metadata_to_restore = self._get_compatible_metadata( + metadata, compatibility_issues) + + self._restore_indexes(connection, metadata_to_restore) + sys.exit() + + # find and print a summary or detail or compatibility issues + if self.args.show_issues is True: + if not compatibility_issues: + logging.info("No incompatibilities found.") + else: + logging.info( + json.dumps(compatibility_issues, + sort_keys=True, + indent=4, + separators=(',', ': '))) + sys.exit() + + # print all compatible (restorable) collections and indexes + if self.args.show_compatible is True: + compatible_metadata = self._get_compatible_metadata( + metadata, compatibility_issues) + logging.info( + json.dumps(compatible_metadata, + sort_keys=True, + indent=4, + separators=(',', ': '))) + + +def main(): + parser = argparse.ArgumentParser(description='Dump and restore indexes from MongoDB to DocumentDB.') + + parser.add_argument('--debug',required=False,action='store_true',help='Output debugging information') + parser.add_argument('--dry-run',required=False,action='store_true',help='Perform processing, but do not actually export or restore indexes') + parser.add_argument('--uri',required=False,type=str,help='URI to connect to MongoDB or Amazon DocumentDB') + parser.add_argument('--dir',required=True,type=str,help='Specify the folder to export to or restore from (required)') + parser.add_argument('--show-compatible',required=False,action='store_true',dest='show_compatible',help='Output all compatible indexes with Amazon DocumentDB (no change is applied)') + parser.add_argument('--show-issues',required=False,action='store_true',dest='show_issues',help='Output a report of compatibility issues found') + parser.add_argument('--dump-indexes',required=False,action='store_true',help='Perform index export from the specified server') + parser.add_argument('--restore-indexes',required=False,action='store_true',help='Restore indexes found in metadata to the specified server') + parser.add_argument('--skip-incompatible',required=False,action='store_true',help='Skip incompatible indexes when restoring metadata') + parser.add_argument('--support-2dsphere',required=False,action='store_true',help='Support 2dsphere indexes creation (collections data must use GeoJSON Point type for indexing)') + parser.add_argument('--skip-python-version-check',required=False,action='store_true',help='Permit execution on Python 3.6 and prior') + parser.add_argument('--shorten-index-name',required=False,action='store_true',help='Shorten long index name to compatible length') + parser.add_argument('--skip-id-indexes',required=False,action='store_true',help='Do not create _id indexes') + + args = parser.parse_args() + + MIN_PYTHON = (3, 7) + if (not args.skip_python_version_check) and (sys.version_info < MIN_PYTHON): + sys.exit("\nPython %s.%s or later is required.\n" % MIN_PYTHON) + + if (args.dump_indexes or args.restore_indexes) and (args.uri is None): + message = "Must specify --uri when dumping or restoring indexes" + parser.error(message) + + if not (args.dump_indexes or args.restore_indexes or args.show_issues or args.show_compatible): + message = "Must specify one of [--dump-indexes | --restore-indexes | --show-issues | --show-compatible]" + parser.error(message) + + if args.dir is not None: + if not os.path.isdir(args.dir): + parser.error("--dir must specify a directory") + + if args.dump_indexes is True: + if args.restore_indexes is True: + parser.error("Cannot dump and restore indexes simultaneously") + + if args.support_2dsphere: + # 2dsphere supported, remove from unsupported + DocumentDbUnsupportedFeatures.UNSUPPORTED_INDEX_TYPES.remove('2dsphere') + + indextool = DocumentDbIndexTool(args) + indextool.run() + + +if __name__ == "__main__": + main() diff --git a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/fl-multiprocess-filtered.py b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/fl-multiprocess-filtered.py new file mode 100644 index 0000000..d6b66ea --- /dev/null +++ b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/fl-multiprocess-filtered.py @@ -0,0 +1,273 @@ +from datetime import datetime, timedelta +import os +import sys +import time +import pymongo +from bson.timestamp import Timestamp +from bson.objectid import ObjectId +import threading +import multiprocessing as mp +import hashlib +import argparse + + +def logIt(threadnum, message): + logTimeStamp = datetime.utcnow().isoformat()[:-3] + 'Z' + print("[{}] thread {:>3d} | {}".format(logTimeStamp,threadnum,message)) + + +def full_load_loader(threadnum, appConfig, perfQ): + if appConfig['verboseLogging']: + logIt(threadnum,'thread started') + + sourceConnection = pymongo.MongoClient(host=appConfig["sourceUri"],appname='migrfull') + sourceDb = sourceConnection[appConfig["sourceNs"].split('.',1)[0]] + sourceColl = sourceDb[appConfig["sourceNs"].split('.',1)[1]] + + destConnection = pymongo.MongoClient(host=appConfig["targetUri"],appname='migrfull') + destDatabase = destConnection[appConfig["targetNs"].split('.',1)[0]] + destCollection = destDatabase[appConfig["targetNs"].split('.',1)[1]] + + startTime = time.time() + lastFeedback = time.time() + + bulkOpList = [] + + # list with replace, not insert, in case document already exists (replaying old oplog) + bulkOpListReplace = [] + numCurrentBulkOps = 0 + + numTotalBatches = 0 + + myCollectionOps = 0 + + if appConfig['verboseLogging']: + logIt(threadnum,"Creating cursor") + + ttlDateTime=datetime(2024,6,30,0,0,0,0) + + if (threadnum == 0): + # thread 0 = $lte only + #cursor = sourceColl.find({'_id': {'$lte': appConfig['boundaries'][threadnum]}}) + #cursor = sourceColl.find({'_id': {'$lte': appConfig['boundaries'][threadnum]},"ttl":{"$gt":ttlDateTime}},hint=[('_id',pymongo.ASCENDING)]) + cursor = sourceColl.find({'_id': {'$lte': appConfig['boundaries'][threadnum]}},hint=[('_id',pymongo.ASCENDING)]) + elif (threadnum == appConfig['numProcessingThreads'] - 1): + # last processor = $gt only + #cursor = sourceColl.find({'_id': {'$gt': appConfig['boundaries'][threadnum-1]}}) + #cursor = sourceColl.find({'_id': {'$gt': appConfig['boundaries'][threadnum-1]},"ttl":{"$gt":ttlDateTime}},hint=[('_id',pymongo.ASCENDING)]) + cursor = sourceColl.find({'_id': {'$gt': appConfig['boundaries'][threadnum-1]}},hint=[('_id',pymongo.ASCENDING)]) + else: + # last processor = $gt prior, $lte next + #cursor = sourceColl.find({'_id': {'$gt': appConfig['boundaries'][threadnum-1], '$lte': appConfig['boundaries'][threadnum]}}) + #cursor = sourceColl.find({'_id': {'$gt': appConfig['boundaries'][threadnum-1], '$lte': appConfig['boundaries'][threadnum]},"ttl":{"$gt":ttlDateTime}},hint=[('_id',pymongo.ASCENDING)]) + cursor = sourceColl.find({'_id': {'$gt': appConfig['boundaries'][threadnum-1], '$lte': appConfig['boundaries'][threadnum]}},hint=[('_id',pymongo.ASCENDING)]) + + perfQ.put({"name":"findCompleted","processNum":threadnum}) + + for doc in cursor: + if ('ttl' in doc) and (doc['ttl'] < ttlDateTime): + # skip old documents + continue + + myCollectionOps += 1 + bulkOpList.append(pymongo.InsertOne(doc)) + # if playing old oplog, need to change inserts to be replaces (the inserts will fail due to _id uniqueness) + #bulkOpListReplace.append(pymongo.ReplaceOne(doc['_id'],doc,upsert=True)) + numCurrentBulkOps += 1 + + if (numCurrentBulkOps >= appConfig["maxInsertsPerBatch"]): + if not appConfig['dryRun']: + # try: + result = destCollection.bulk_write(bulkOpList,ordered=True) + # except: + # # replace inserts as replaces + # result = destCollection.bulk_write(bulkOpListReplace,ordered=True) + perfQ.put({"name":"batchCompleted","operations":numCurrentBulkOps,"processNum":threadnum}) + bulkOpList = [] + bulkOpListReplace = [] + numCurrentBulkOps = 0 + numTotalBatches += 1 + + if (numCurrentBulkOps > 0): + if not appConfig['dryRun']: + # try: + result = destCollection.bulk_write(bulkOpList,ordered=True) + # except: + # # replace inserts as replaces + # result = destCollection.bulk_write(bulkOpListReplace,ordered=True) + perfQ.put({"name":"batchCompleted","operations":numCurrentBulkOps,"processNum":threadnum}) + bulkOpList = [] + bulkOpListReplace = [] + numCurrentBulkOps = 0 + numTotalBatches += 1 + + perfQ.put({"name":"processCompleted","processNum":threadnum}) + + +def reporter(appConfig, perfQ): + if appConfig['verboseLogging']: + logIt(-1,'reporting thread started') + + startTime = time.time() + lastTime = time.time() + + lastProcessedOplogEntries = 0 + nextReportTime = startTime + appConfig["feedbackSeconds"] + + numWorkersCompleted = 0 + numWorkersLoading = 0 + numProcessedOplogEntries = 0 + + while (numWorkersCompleted < appConfig["numProcessingThreads"]): + time.sleep(appConfig["feedbackSeconds"]) + nowTime = time.time() + + while not perfQ.empty(): + qMessage = perfQ.get_nowait() + if qMessage['name'] == "batchCompleted": + numProcessedOplogEntries += qMessage['operations'] + elif qMessage['name'] == "processCompleted": + numWorkersCompleted += 1 + elif qMessage['name'] == "findCompleted": + numWorkersLoading += 1 + + # total total + elapsedSeconds = nowTime - startTime + totalOpsPerSecond = numProcessedOplogEntries / elapsedSeconds + + # elapsed hours, minutes, seconds + thisHours, rem = divmod(elapsedSeconds, 3600) + thisMinutes, thisSeconds = divmod(rem, 60) + thisHMS = "{:0>2}:{:0>2}:{:05.2f}".format(int(thisHours),int(thisMinutes),thisSeconds) + + # this interval + intervalElapsedSeconds = nowTime - lastTime + intervalOpsPerSecond = (numProcessedOplogEntries - lastProcessedOplogEntries) / intervalElapsedSeconds + + logTimeStamp = datetime.utcnow().isoformat()[:-3] + 'Z' + print("[{0}] elapsed {1} | total o/s {2:12,.2f} | interval o/s {3:12,.2f} | tot ops {4:16,d} | loading {5:5d}".format(logTimeStamp,thisHMS,totalOpsPerSecond,intervalOpsPerSecond,numProcessedOplogEntries,numWorkersLoading)) + nextReportTime = nowTime + appConfig["feedbackSeconds"] + + lastTime = nowTime + lastProcessedOplogEntries = numProcessedOplogEntries + + +def main(): + parser = argparse.ArgumentParser(description='Full Load migration tool.') + + parser.add_argument('--skip-python-version-check', + required=False, + action='store_true', + help='Permit execution on Python 3.6 and prior') + + parser.add_argument('--source-uri', + required=True, + type=str, + help='Source URI') + + parser.add_argument('--target-uri', + required=True, + type=str, + help='Target URI') + + parser.add_argument('--source-namespace', + required=True, + type=str, + help='Source Namespace as .') + + parser.add_argument('--target-namespace', + required=False, + type=str, + help='Target Namespace as ., defaults to --source-namespace') + + parser.add_argument('--feedback-seconds', + required=False, + type=int, + default=60, + help='Number of seconds between feedback output') + + parser.add_argument('--max-inserts-per-batch', + required=False, + type=int, + default=100, + help='Maximum number of inserts to include in a single batch') + + parser.add_argument('--dry-run', + required=False, + action='store_true', + help='Read source changes only, do not apply to target') + + parser.add_argument('--verbose', + required=False, + action='store_true', + help='Enable verbose logging') + + parser.add_argument('--boundaries', + required=True, + type=str, + help='Boundaries for segmenting') + + parser.add_argument('--boundary-datatype', + required=False, + type=str, + default='objectid', + choices=['objectid','string','int'], + help='Boundaries for segmenting') + + + args = parser.parse_args() + + MIN_PYTHON = (3, 7) + if (not args.skip_python_version_check) and (sys.version_info < MIN_PYTHON): + sys.exit("\nPython %s.%s or later is required.\n" % MIN_PYTHON) + + appConfig = {} + appConfig['sourceUri'] = args.source_uri + appConfig['targetUri'] = args.target_uri + appConfig['maxInsertsPerBatch'] = args.max_inserts_per_batch + appConfig['feedbackSeconds'] = args.feedback_seconds + appConfig['dryRun'] = args.dry_run + appConfig['sourceNs'] = args.source_namespace + if not args.target_namespace: + appConfig['targetNs'] = args.source_namespace + else: + appConfig['targetNs'] = args.target_namespace + appConfig['verboseLogging'] = args.verbose + appConfig['boundaryDatatype'] = args.boundary_datatype + + boundaryList = args.boundaries.split(',') + appConfig['boundaries'] = [] + for thisBoundary in boundaryList: + if appConfig['boundaryDatatype'] == 'objectid': + appConfig['boundaries'].append(ObjectId(thisBoundary)) + elif appConfig['boundaryDatatype'] == 'string': + appConfig['boundaries'].append(thisBoundary) + else: + appConfig['boundaries'].append(int(thisBoundary)) + + appConfig['numProcessingThreads'] = len(appConfig['boundaries'])+1 + + logIt(-1,"processing using {} threads".format(appConfig['numProcessingThreads'])) + + mp.set_start_method('spawn') + q = mp.Manager().Queue() + + t = threading.Thread(target=reporter,args=(appConfig,q)) + t.start() + + processList = [] + for loop in range(appConfig["numProcessingThreads"]): + p = mp.Process(target=full_load_loader,args=(loop,appConfig,q)) + processList.append(p) + + for process in processList: + process.start() + + for process in processList: + process.join() + + t.join() + + +if __name__ == "__main__": + main() diff --git a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/fl-multiprocess.py b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/fl-multiprocess.py new file mode 100644 index 0000000..bb17098 --- /dev/null +++ b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/fl-multiprocess.py @@ -0,0 +1,326 @@ +from datetime import datetime, timedelta +import os +import sys +import time +import pymongo +from bson.timestamp import Timestamp +from bson.objectid import ObjectId +import threading +import multiprocessing as mp +import hashlib +import argparse +import boto3 +import warnings + + +def logIt(threadnum, message): + warnings.filterwarnings("ignore","You appear to be connected to a DocumentDB cluster.") + + logTimeStamp = datetime.utcnow().isoformat()[:-3] + 'Z' + print("[{}] thread {:>3d} | {}".format(logTimeStamp,threadnum,message)) + + +def getCollectionCount(appConfig): + warnings.filterwarnings("ignore","You appear to be connected to a DocumentDB cluster.") + + sourceDb = appConfig["sourceNs"].split('.',1)[0] + sourceColl = appConfig["sourceNs"].split('.',1)[1] + client = pymongo.MongoClient(appConfig['sourceUri']) + db = client[sourceDb] + collStats = db.command("collStats", sourceColl) + client.close() + return max(collStats['count'],1) + + +def full_load_loader(threadnum, appConfig, perfQ): + warnings.filterwarnings("ignore","You appear to be connected to a DocumentDB cluster.") + + if appConfig['verboseLogging']: + logIt(threadnum,'thread started') + + sourceConnection = pymongo.MongoClient(host=appConfig["sourceUri"],appname='migrfull') + sourceDb = sourceConnection[appConfig["sourceNs"].split('.',1)[0]] + sourceColl = sourceDb[appConfig["sourceNs"].split('.',1)[1]] + + destConnection = pymongo.MongoClient(host=appConfig["targetUri"],appname='migrfull') + destDatabase = destConnection[appConfig["targetNs"].split('.',1)[0]] + destCollection = destDatabase[appConfig["targetNs"].split('.',1)[1]] + + startTime = time.time() + lastFeedback = time.time() + + bulkOpList = [] + + # list with replace, not insert, in case document already exists (replaying old oplog) + bulkOpListReplace = [] + numCurrentBulkOps = 0 + + numTotalBatches = 0 + + myCollectionOps = 0 + + if appConfig['verboseLogging']: + logIt(threadnum,"Creating cursor") + + if (threadnum == 0): + # thread 0 = $lte only + cursor = sourceColl.find({'_id': {'$lte': appConfig['boundaries'][threadnum]}}) + elif (threadnum == appConfig['numProcessingThreads'] - 1): + # last processor = $gt only + cursor = sourceColl.find({'_id': {'$gt': appConfig['boundaries'][threadnum-1]}}) + else: + # last processor = $gt prior, $lte next + cursor = sourceColl.find({'_id': {'$gt': appConfig['boundaries'][threadnum-1], '$lte': appConfig['boundaries'][threadnum]}}) + + perfQ.put({"name":"findCompleted","processNum":threadnum}) + + for doc in cursor: + myCollectionOps += 1 + bulkOpList.append(pymongo.InsertOne(doc)) + # if playing old oplog, need to change inserts to be replaces (the inserts will fail due to _id uniqueness) + #bulkOpListReplace.append(pymongo.ReplaceOne(doc['_id'],doc,upsert=True)) + numCurrentBulkOps += 1 + + if (numCurrentBulkOps >= appConfig["maxInsertsPerBatch"]): + if not appConfig['dryRun']: + # try: + result = destCollection.bulk_write(bulkOpList,ordered=True) + # except: + # # replace inserts as replaces + # result = destCollection.bulk_write(bulkOpListReplace,ordered=True) + perfQ.put({"name":"batchCompleted","operations":numCurrentBulkOps,"processNum":threadnum}) + bulkOpList = [] + bulkOpListReplace = [] + numCurrentBulkOps = 0 + numTotalBatches += 1 + + if (numCurrentBulkOps > 0): + if not appConfig['dryRun']: + # try: + result = destCollection.bulk_write(bulkOpList,ordered=True) + # except: + # # replace inserts as replaces + # result = destCollection.bulk_write(bulkOpListReplace,ordered=True) + perfQ.put({"name":"batchCompleted","operations":numCurrentBulkOps,"processNum":threadnum}) + bulkOpList = [] + bulkOpListReplace = [] + numCurrentBulkOps = 0 + numTotalBatches += 1 + + perfQ.put({"name":"processCompleted","processNum":threadnum}) + + +def reporter(appConfig, perfQ): + createCloudwatchMetrics = appConfig['createCloudwatchMetrics'] + numDocumentsToMigrate = appConfig['numDocumentsToMigrate'] + clusterName = appConfig['clusterName'] + + if appConfig['verboseLogging']: + logIt(-1,'reporting thread started') + + if createCloudwatchMetrics: + # only instantiate client if needed + cloudWatchClient = boto3.client('cloudwatch') + + startTime = time.time() + lastTime = time.time() + + # number of seconds between posting metrics to cloudwatch + cloudwatchPutSeconds = 30 + lastCloudwatchPutTime = time.time() + + lastProcessedOplogEntries = 0 + nextReportTime = startTime + appConfig["feedbackSeconds"] + + numWorkersCompleted = 0 + numWorkersLoading = 0 + numProcessedOplogEntries = 0 + + while (numWorkersCompleted < appConfig["numProcessingThreads"]): + time.sleep(appConfig["feedbackSeconds"]) + nowTime = time.time() + + while not perfQ.empty(): + qMessage = perfQ.get_nowait() + if qMessage['name'] == "batchCompleted": + numProcessedOplogEntries += qMessage['operations'] + elif qMessage['name'] == "processCompleted": + numWorkersCompleted += 1 + numWorkersLoading -= 1 + elif qMessage['name'] == "findCompleted": + numWorkersLoading += 1 + + # total total + elapsedSeconds = nowTime - startTime + totalOpsPerSecond = numProcessedOplogEntries / elapsedSeconds + + # estimated time to done + if numProcessedOplogEntries > 0: + pctDone = max(numProcessedOplogEntries / numDocumentsToMigrate,0.001) + remainingSeconds = max(int(elapsedSeconds / pctDone) - elapsedSeconds,0) + else: + remainingSeconds = 0 + + thisHours, rem = divmod(remainingSeconds, 3600) + thisMinutes, thisSeconds = divmod(rem, 60) + remainHMS = "{:0>2}:{:0>2}:{:0>2}".format(int(thisHours),int(thisMinutes),int(thisSeconds)) + + if (numDocumentsToMigrate == 0): + pctDone = 100.0 + else: + pctDone = (numProcessedOplogEntries / numDocumentsToMigrate) * 100.0 + + # elapsed hours, minutes, seconds + thisHours, rem = divmod(elapsedSeconds, 3600) + thisMinutes, thisSeconds = divmod(rem, 60) + thisHMS = "{:0>2}:{:0>2}:{:05.2f}".format(int(thisHours),int(thisMinutes),thisSeconds) + + # this interval + intervalElapsedSeconds = nowTime - lastTime + intervalOpsPerSecond = (numProcessedOplogEntries - lastProcessedOplogEntries) / intervalElapsedSeconds + + logTimeStamp = datetime.utcnow().isoformat()[:-3] + 'Z' + print("[{0}] elapsed {1} | total o/s {2:12,.2f} | interval o/s {3:12,.2f} | tot ops {4:16,d} | loading {5:5d} | pct {6:6.2f}% | done in {7}".format(logTimeStamp,thisHMS,totalOpsPerSecond,intervalOpsPerSecond,numProcessedOplogEntries,numWorkersLoading,pctDone,remainHMS)) + nextReportTime = nowTime + appConfig["feedbackSeconds"] + + lastTime = nowTime + lastProcessedOplogEntries = numProcessedOplogEntries + + # output CW metrics every cloudwatchPutSeconds seconds + if createCloudwatchMetrics and ((time.time() - lastCloudwatchPutTime) > cloudwatchPutSeconds): + # log to cloudwatch + cloudWatchClient.put_metric_data( + Namespace='CustomDocDB', + MetricData=[{'MetricName':'MigratorFLInsertsPerSecond','Dimensions':[{'Name':'Cluster','Value':clusterName}],'Value':intervalOpsPerSecond,'StorageResolution':60}, + {'MetricName':'MigratorFLRemainingSeconds','Dimensions':[{'Name':'Cluster','Value':clusterName}],'Value':remainingSeconds,'StorageResolution':60}]) + + lastCloudwatchPutTime = time.time() + + +def main(): + parser = argparse.ArgumentParser(description='Full Load migration tool.') + + parser.add_argument('--skip-python-version-check', + required=False, + action='store_true', + help='Permit execution on Python 3.6 and prior') + + parser.add_argument('--source-uri', + required=True, + type=str, + help='Source URI') + + parser.add_argument('--target-uri', + required=True, + type=str, + help='Target URI') + + parser.add_argument('--source-namespace', + required=True, + type=str, + help='Source Namespace as .') + + parser.add_argument('--target-namespace', + required=False, + type=str, + help='Target Namespace as ., defaults to --source-namespace') + + parser.add_argument('--feedback-seconds', + required=False, + type=int, + default=60, + help='Number of seconds between feedback output') + + parser.add_argument('--max-inserts-per-batch', + required=False, + type=int, + default=100, + help='Maximum number of inserts to include in a single batch') + + parser.add_argument('--dry-run', + required=False, + action='store_true', + help='Read source changes only, do not apply to target') + + parser.add_argument('--verbose', + required=False, + action='store_true', + help='Enable verbose logging') + + parser.add_argument('--boundaries', + required=True, + type=str, + help='Boundaries for segmenting') + + parser.add_argument('--boundary-datatype', + required=False, + type=str, + default='objectid', + choices=['objectid','string','int'], + help='Boundaries for segmenting') + + parser.add_argument('--create-cloudwatch-metrics',required=False,action='store_true',help='Create CloudWatch metrics when garbage collection is active') + parser.add_argument('--cluster-name',required=False,type=str,help='Name of cluster for CloudWatch metrics') + + args = parser.parse_args() + + MIN_PYTHON = (3, 7) + if (not args.skip_python_version_check) and (sys.version_info < MIN_PYTHON): + sys.exit("\nPython %s.%s or later is required.\n" % MIN_PYTHON) + + if args.create_cloudwatch_metrics and (args.cluster_name is None): + sys.exit("\nMust supply --cluster-name when capturing CloudWatch metrics.\n") + + appConfig = {} + appConfig['sourceUri'] = args.source_uri + appConfig['targetUri'] = args.target_uri + appConfig['maxInsertsPerBatch'] = args.max_inserts_per_batch + appConfig['feedbackSeconds'] = args.feedback_seconds + appConfig['dryRun'] = args.dry_run + appConfig['sourceNs'] = args.source_namespace + if not args.target_namespace: + appConfig['targetNs'] = args.source_namespace + else: + appConfig['targetNs'] = args.target_namespace + appConfig['verboseLogging'] = args.verbose + appConfig['boundaryDatatype'] = args.boundary_datatype + appConfig['createCloudwatchMetrics'] = args.create_cloudwatch_metrics + appConfig['clusterName'] = args.cluster_name + + boundaryList = args.boundaries.split(',') + appConfig['boundaries'] = [] + for thisBoundary in boundaryList: + if appConfig['boundaryDatatype'] == 'objectid': + appConfig['boundaries'].append(ObjectId(thisBoundary)) + elif appConfig['boundaryDatatype'] == 'string': + appConfig['boundaries'].append(thisBoundary) + else: + appConfig['boundaries'].append(int(thisBoundary)) + + appConfig['numProcessingThreads'] = len(appConfig['boundaries'])+1 + appConfig['numDocumentsToMigrate'] = getCollectionCount(appConfig) + + logIt(-1,"processing using {} threads".format(appConfig['numProcessingThreads'])) + + mp.set_start_method('spawn') + q = mp.Manager().Queue() + + t = threading.Thread(target=reporter,args=(appConfig,q)) + t.start() + + processList = [] + for loop in range(appConfig["numProcessingThreads"]): + p = mp.Process(target=full_load_loader,args=(loop,appConfig,q)) + processList.append(p) + + for process in processList: + process.start() + + for process in processList: + process.join() + + t.join() + + +if __name__ == "__main__": + main() diff --git a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/server.py b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/server.py new file mode 100644 index 0000000..03f5e8b --- /dev/null +++ b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/server.py @@ -0,0 +1,156 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance +# with the License. A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions +# and limitations under the License. + +"""AWS Labs DocumentDB Migration MCP Server implementation for migrating data to AWS DocumentDB.""" + +import argparse +import os +import sys +from awslabs.documentdb_migration_mcp_server.full_load_tools import run_full_load, run_filtered_full_load +from awslabs.documentdb_migration_mcp_server.cdc_tools import run_cdc, get_resume_token +from awslabs.documentdb_migration_mcp_server.boundary_tools import generate_boundaries +from awslabs.documentdb_migration_mcp_server.index_tools import ( + dump_indexes, restore_indexes, show_compatibility_issues, show_compatible_indexes +) +from loguru import logger +from mcp.server.fastmcp import FastMCP + + +# Create the FastMCP server +mcp = FastMCP( + 'awslabs.documentdb-migration-mcp-server', + instructions="""DocumentDB Migration MCP Server provides tools to migrate data to AWS DocumentDB. + + Usage pattern: + 1. For full load migrations, use the `runFullLoad` or `runFilteredFullLoad` tools + - Boundaries will be auto-generated if not provided + 2. For CDC (Change Data Capture) migrations, use the `runCDC` tool + 3. To get a change stream resume token for CDC, use the `getResumeToken` tool + 4. To generate boundaries for segmenting collections, use the `generateBoundaries` tool + 5. For index management: + - To dump indexes from a source database, use the `dumpIndexes` tool + - To restore indexes to a target database, use the `restoreIndexes` tool + - To check index compatibility with DocumentDB, use the `showIndexCompatibilityIssues` tool + - To show compatible indexes, use the `showCompatibleIndexes` tool + + Server Configuration: + - The server requires access to the migration scripts in the scripts directory.""", + dependencies=[ + 'pydantic', + 'loguru', + 'pymongo', + 'boto3', + ], +) + + +# Register all tools + +# Full Load tools +mcp.tool(name='runFullLoad')(run_full_load) +mcp.tool(name='runFilteredFullLoad')(run_filtered_full_load) + +# CDC tools +mcp.tool(name='runCDC')(run_cdc) +mcp.tool(name='getResumeToken')(get_resume_token) + +# Boundary tools +mcp.tool(name='generateBoundaries')(generate_boundaries) + +# Index tools +mcp.tool(name='dumpIndexes')(dump_indexes) +mcp.tool(name='restoreIndexes')(restore_indexes) +mcp.tool(name='showIndexCompatibilityIssues')(show_compatibility_issues) +mcp.tool(name='showCompatibleIndexes')(show_compatible_indexes) + + +def main(): + """Run the MCP server with CLI argument support.""" + parser = argparse.ArgumentParser( + description='An AWS Labs Model Context Protocol (MCP) server for DocumentDB Migration' + ) + parser.add_argument('--sse', action='store_true', help='Use SSE transport') + parser.add_argument('--port', type=int, default=8889, help='Port to run the server on') + parser.add_argument('--host', type=str, default='127.0.0.1', help='Host to bind the server to') + parser.add_argument( + '--log-level', + type=str, + default='INFO', + choices=['TRACE', 'DEBUG', 'INFO', 'SUCCESS', 'WARNING', 'ERROR', 'CRITICAL'], + help='Set the logging level', + ) + parser.add_argument( + '--scripts-dir', + type=str, + default=None, + help='Directory containing the migration scripts (default: scripts subdirectory)', + ) + parser.add_argument( + '--aws-profile', + type=str, + default=None, + help='AWS profile to use for AWS services including DocumentDB and CloudWatch', + ) + parser.add_argument( + '--aws-region', + type=str, + default=None, + help='AWS region to use for AWS services including DocumentDB and CloudWatch', + ) + + args = parser.parse_args() + + # Configure logging + logger.remove() + logger.add( + lambda msg: print(msg), + level=args.log_level, + format='{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}', + ) + + logger.info(f'Starting DocumentDB Migration MCP Server on {args.host}:{args.port}') + logger.info(f'Log level: {args.log_level}') + + # Set up scripts directory + if args.scripts_dir: + scripts_dir = args.scripts_dir + else: + scripts_dir = os.path.join(os.path.dirname(__file__), "scripts") + + # Create scripts directory if it doesn't exist + os.makedirs(scripts_dir, exist_ok=True) + logger.info(f'Scripts directory: {scripts_dir}') + + # Set AWS profile and region if provided + if args.aws_profile: + os.environ['AWS_PROFILE'] = args.aws_profile + logger.info(f'Using AWS profile: {args.aws_profile}') + + if args.aws_region: + os.environ['AWS_REGION'] = args.aws_region + logger.info(f'Using AWS region: {args.aws_region}') + + try: + # Run server with appropriate transport + if args.sse: + mcp.settings.port = args.port + mcp.settings.host = args.host + mcp.run(transport='sse') + else: + mcp.settings.port = args.port + mcp.settings.host = args.host + mcp.run() + except Exception as e: + logger.critical(f'Failed to start server: {str(e)}') + + +if __name__ == '__main__': + main() From 91477373a735cc904509201bad0822becddda70c Mon Sep 17 00:00:00 2001 From: Inderpreet Singh Date: Mon, 7 Jul 2025 17:12:14 -0700 Subject: [PATCH 2/6] Update .gitignore --- mcp-server/documentdb-migration-mcp/.gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/mcp-server/documentdb-migration-mcp/.gitignore b/mcp-server/documentdb-migration-mcp/.gitignore index f397032..11090f8 100644 --- a/mcp-server/documentdb-migration-mcp/.gitignore +++ b/mcp-server/documentdb-migration-mcp/.gitignore @@ -1,4 +1,5 @@ # Python cache files +__pycache__/ *.py[cod] # Build artifacts From 202153aee09f4360124500b38debb4bcb570bc16 Mon Sep 17 00:00:00 2001 From: Inderpreet Singh Date: Tue, 8 Jul 2025 11:12:12 -0700 Subject: [PATCH 3/6] Update README.md --- mcp-server/documentdb-migration-mcp/README.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/mcp-server/documentdb-migration-mcp/README.md b/mcp-server/documentdb-migration-mcp/README.md index 35b4126..fc8f9a6 100644 --- a/mcp-server/documentdb-migration-mcp/README.md +++ b/mcp-server/documentdb-migration-mcp/README.md @@ -34,6 +34,25 @@ This MCP (Model Context Protocol) server provides tools for migrating data to Do } } ``` +We recommend that you also install **Amazon DocumentDB MCP Server** as well along with Migration MCP Server. The DocumentDB MCP server will help with DML operations. + +```json +{ + "awslabs.documentdb-mcp-server": { + "command": "uvx", + "args": [ + "awslabs.documentdb-mcp-server@latest", + ], + "env": { + "AWS_PROFILE": "default", + "AWS_REGION": "us-east-1", + "FASTMCP_LOG_LEVEL": "ERROR" + }, + "disabled": false, + "autoApprove": [] + } +} +``` You can customize the AWS profile and region by changing the `AWS_PROFILE` and `AWS_REGION` environment variables. ### 2. Through your favorite AI Agentic tool using local file From 0825a1433cb9e9338efa8bd4e7399cfa1ff610e4 Mon Sep 17 00:00:00 2001 From: Inderpreet Singh Date: Tue, 8 Jul 2025 11:15:16 -0700 Subject: [PATCH 4/6] Update README.md --- mcp-server/documentdb-migration-mcp/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mcp-server/documentdb-migration-mcp/README.md b/mcp-server/documentdb-migration-mcp/README.md index fc8f9a6..251cc90 100644 --- a/mcp-server/documentdb-migration-mcp/README.md +++ b/mcp-server/documentdb-migration-mcp/README.md @@ -13,7 +13,7 @@ This MCP (Model Context Protocol) server provides tools for migrating data to Do ## Installation -### 1. Through your favorite AI Agentic tool (e.g., for Amazon Q Developer CLI MCP, Claude, etc.) using uv package (Recommended) +### 1. Through your favorite AI Agentic tool (e.g., for Amazon Q Developer CLI, Cline, etc.) using uv package (Recommended) ```json { From 908783644f1aef9eb69cd83a7da62dcff1ed0ebe Mon Sep 17 00:00:00 2001 From: Inderpreet Singh Date: Wed, 9 Jul 2025 12:41:51 -0700 Subject: [PATCH 5/6] upgraded to version 0.5.0 --- mcp-server/documentdb-migration-mcp/README.md | 25 +- .../awslabs/__init__.py | 10 + .../__init__.py | 10 + .../cdc_tools.py | 29 ++- .../full_load_tools.py | 57 +++- .../index_tools.py | 104 +++++++- .../migration_workflow.py | 244 ++++++++++++++++++ .../scripts/documentdb_index_tool.py | 16 +- .../documentdb_migration_mcp_server/server.py | 20 +- 9 files changed, 482 insertions(+), 33 deletions(-) create mode 100644 mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/migration_workflow.py diff --git a/mcp-server/documentdb-migration-mcp/README.md b/mcp-server/documentdb-migration-mcp/README.md index 251cc90..a8d05ff 100644 --- a/mcp-server/documentdb-migration-mcp/README.md +++ b/mcp-server/documentdb-migration-mcp/README.md @@ -4,6 +4,7 @@ This MCP (Model Context Protocol) server provides tools for migrating data to Do ## Features +- **Easy Migration Workflow**: Complete end-to-end migration workflow that combines index management and full load migration - **Full Load Migration**: Migrate data from a source database to DocumentDB in a one-time operation - **Filtered Full Load Migration**: Migrate data with filtering based on TTL - **Change Data Capture (CDC)**: Continuously replicate changes from a source database to DocumentDB @@ -108,6 +109,25 @@ python -m awslabs.documentdb_migration_mcp_server.server ## MCP Tools +### runEasyMigration + +Run a complete end-to-end migration workflow from source to target. + +**Parameters:** +- `source_uri`: Source URI in MongoDB Connection String format +- `target_uri`: Target URI in MongoDB Connection String format +- `source_namespace`: Source Namespace as . +- `target_namespace`: (Optional) Target Namespace as ., defaults to source_namespace +- `max_inserts_per_batch`: (Optional) Maximum number of inserts to include in a single batch, defaults to 100 +- `feedback_seconds`: (Optional) Number of seconds between feedback output, defaults to 60 +- `dry_run`: (Optional) Read source changes only, do not apply to target, defaults to false +- `verbose`: (Optional) Enable verbose logging, defaults to false +- `create_cloudwatch_metrics`: (Optional) Create CloudWatch metrics for monitoring, defaults to false +- `cluster_name`: (Optional) Name of cluster for CloudWatch metrics +- `skip_incompatible_indexes`: (Optional) Skip incompatible indexes when restoring metadata, defaults to true +- `support_2dsphere`: (Optional) Support 2dsphere indexes creation, defaults to false +- `skip_id_indexes`: (Optional) Do not create _id indexes, defaults to true + ### runFullLoad Run a full load migration from source to target. @@ -224,12 +244,15 @@ Show compatible indexes with Amazon DocumentDB. - `debug`: (Optional) Output debugging information ## Requirements - +- Install uv from [Astral] (https://docs.astral.sh/uv/getting-started/installation/) - Python 3.10+ - PyMongo - Boto3 (for CloudWatch metrics) - MCP Server +## Version +0.5.0 + ## License This project is licensed under the Apache License 2.0 - see the LICENSE file for details. \ No newline at end of file diff --git a/mcp-server/documentdb-migration-mcp/awslabs/__init__.py b/mcp-server/documentdb-migration-mcp/awslabs/__init__.py index d77b263..638d1f7 100644 --- a/mcp-server/documentdb-migration-mcp/awslabs/__init__.py +++ b/mcp-server/documentdb-migration-mcp/awslabs/__init__.py @@ -1,3 +1,13 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance +# with the License. A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions +# and limitations under the License. """AWS Labs namespace package.""" diff --git a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/__init__.py b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/__init__.py index cfd66f6..c5f2493 100644 --- a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/__init__.py +++ b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/__init__.py @@ -1,2 +1,12 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance +# with the License. A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions +# and limitations under the License. """AWS Labs DocumentDB Migration MCP Server package.""" diff --git a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/cdc_tools.py b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/cdc_tools.py index eb1848b..1ce0b2e 100644 --- a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/cdc_tools.py +++ b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/cdc_tools.py @@ -15,6 +15,7 @@ import sys import time import subprocess +import tempfile from datetime import datetime from typing import Annotated, Any, Dict, List, Optional from pydantic import Field @@ -182,10 +183,31 @@ async def run_cdc( # Execute command try: logger.info(f"Executing command: {' '.join(cmd)}") + + # Create a log file for the output + try: + # Try to use a directory in the user's home directory + log_dir = os.path.join(os.path.expanduser("~"), ".documentdb-migration", "logs") + os.makedirs(log_dir, exist_ok=True) + except Exception as e: + # Fall back to a temporary directory if home directory is not accessible + logger.warning(f"Could not create log directory in home directory: {str(e)}") + log_dir = tempfile.mkdtemp(prefix="documentdb_migration_logs_") + logger.info(f"Using temporary directory for logs: {log_dir}") + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + log_file_path = os.path.join(log_dir, f"cdc_{timestamp}.log") + + logger.info(f"Logging output to: {log_file_path}") + + # Open the log file + log_file = open(log_file_path, "w") + + # Start the process with stdout and stderr redirected to the log file process = subprocess.Popen( cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + stdout=log_file, + stderr=log_file, text=True, bufsize=1, universal_newlines=True @@ -194,9 +216,10 @@ async def run_cdc( # Process is running in the background return { "success": True, - "message": "CDC migration started successfully", + "message": f"CDC migration started successfully. Logs are being written to {log_file_path}", "process_id": process.pid, "command": " ".join(cmd), + "log_file": log_file_path, } except Exception as e: logger.error(f"Error starting CDC migration: {str(e)}") diff --git a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/full_load_tools.py b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/full_load_tools.py index 066b001..b2e256d 100644 --- a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/full_load_tools.py +++ b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/full_load_tools.py @@ -15,6 +15,7 @@ import sys import time import subprocess +import tempfile from datetime import datetime from typing import Annotated, Any, Dict, List, Optional from pydantic import Field @@ -170,10 +171,31 @@ async def run_full_load( # Execute command try: logger.info(f"Executing command: {' '.join(cmd)}") + + # Create a log file for the output + try: + # Try to use a directory in the user's home directory + log_dir = os.path.join(os.path.expanduser("~"), ".documentdb-migration", "logs") + os.makedirs(log_dir, exist_ok=True) + except Exception as e: + # Fall back to a temporary directory if home directory is not accessible + logger.warning(f"Could not create log directory in home directory: {str(e)}") + log_dir = tempfile.mkdtemp(prefix="documentdb_migration_logs_") + logger.info(f"Using temporary directory for logs: {log_dir}") + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + log_file_path = os.path.join(log_dir, f"full_load_{timestamp}.log") + + logger.info(f"Logging output to: {log_file_path}") + + # Open the log file + log_file = open(log_file_path, "w") + + # Start the process with stdout and stderr redirected to the log file process = subprocess.Popen( cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + stdout=log_file, + stderr=log_file, text=True, bufsize=1, universal_newlines=True @@ -182,9 +204,10 @@ async def run_full_load( # Process is running in the background return { "success": True, - "message": "Full load migration started successfully", + "message": f"Full load migration started successfully. Logs are being written to {log_file_path}", "process_id": process.pid, "command": " ".join(cmd), + "log_file": log_file_path, } except Exception as e: logger.error(f"Error starting full load migration: {str(e)}") @@ -321,10 +344,31 @@ async def run_filtered_full_load( # Execute command try: logger.info(f"Executing command: {' '.join(cmd)}") + + # Create a log file for the output + try: + # Try to use a directory in the user's home directory + log_dir = os.path.join(os.path.expanduser("~"), ".documentdb-migration", "logs") + os.makedirs(log_dir, exist_ok=True) + except Exception as e: + # Fall back to a temporary directory if home directory is not accessible + logger.warning(f"Could not create log directory in home directory: {str(e)}") + log_dir = tempfile.mkdtemp(prefix="documentdb_migration_logs_") + logger.info(f"Using temporary directory for logs: {log_dir}") + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + log_file_path = os.path.join(log_dir, f"filtered_full_load_{timestamp}.log") + + logger.info(f"Logging output to: {log_file_path}") + + # Open the log file + log_file = open(log_file_path, "w") + + # Start the process with stdout and stderr redirected to the log file process = subprocess.Popen( cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + stdout=log_file, + stderr=log_file, text=True, bufsize=1, universal_newlines=True @@ -333,9 +377,10 @@ async def run_filtered_full_load( # Process is running in the background return { "success": True, - "message": "Filtered full load migration started successfully", + "message": f"Filtered full load migration started successfully. Logs are being written to {log_file_path}", "process_id": process.pid, "command": " ".join(cmd), + "log_file": log_file_path, } except Exception as e: logger.error(f"Error starting filtered full load migration: {str(e)}") diff --git a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/index_tools.py b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/index_tools.py index 2616830..86e33b0 100644 --- a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/index_tools.py +++ b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/index_tools.py @@ -91,12 +91,31 @@ async def dump_indexes( check=True ) + # Create a log file for the output + try: + # Try to use a directory in the user's home directory + log_dir = os.path.join(os.path.expanduser("~"), ".documentdb-migration", "logs") + os.makedirs(log_dir, exist_ok=True) + except Exception as e: + # Fall back to a temporary directory if home directory is not accessible + logger.warning(f"Could not create log directory in home directory: {str(e)}") + log_dir = tempfile.mkdtemp(prefix="documentdb_migration_logs_") + logger.info(f"Using temporary directory for logs: {log_dir}") + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + log_file_path = os.path.join(log_dir, f"index_dump_{timestamp}.log") + + # Write stdout and stderr to the log file + with open(log_file_path, "w") as log_file: + log_file.write("STDOUT:\n") + log_file.write(result.stdout) + log_file.write("\nSTDERR:\n") + log_file.write(result.stderr) + return { "success": True, - "message": "Index dump completed successfully", + "message": f"Index dump completed successfully. Logs are available at {log_file_path}", "output_dir": output_dir, - "stdout": result.stdout, - "stderr": result.stderr, } except subprocess.CalledProcessError as e: logger.error(f"Error dumping indexes: {e.stderr}") @@ -215,11 +234,30 @@ async def restore_indexes( check=True ) + # Create a log file for the output + try: + # Try to use a directory in the user's home directory + log_dir = os.path.join(os.path.expanduser("~"), ".documentdb-migration", "logs") + os.makedirs(log_dir, exist_ok=True) + except Exception as e: + # Fall back to a temporary directory if home directory is not accessible + logger.warning(f"Could not create log directory in home directory: {str(e)}") + log_dir = tempfile.mkdtemp(prefix="documentdb_migration_logs_") + logger.info(f"Using temporary directory for logs: {log_dir}") + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + log_file_path = os.path.join(log_dir, f"index_restore_{timestamp}.log") + + # Write stdout and stderr to the log file + with open(log_file_path, "w") as log_file: + log_file.write("STDOUT:\n") + log_file.write(result.stdout) + log_file.write("\nSTDERR:\n") + log_file.write(result.stderr) + return { "success": True, - "message": "Index restore completed successfully", - "stdout": result.stdout, - "stderr": result.stderr, + "message": f"Index restore completed successfully. Logs are available at {log_file_path}", } except subprocess.CalledProcessError as e: logger.error(f"Error restoring indexes: {e.stderr}") @@ -284,18 +322,37 @@ async def show_compatibility_issues( check=True ) + # Create a log file for the output + try: + # Try to use a directory in the user's home directory + log_dir = os.path.join(os.path.expanduser("~"), ".documentdb-migration", "logs") + os.makedirs(log_dir, exist_ok=True) + except Exception as e: + # Fall back to a temporary directory if home directory is not accessible + logger.warning(f"Could not create log directory in home directory: {str(e)}") + log_dir = tempfile.mkdtemp(prefix="documentdb_migration_logs_") + logger.info(f"Using temporary directory for logs: {log_dir}") + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + log_file_path = os.path.join(log_dir, f"compatibility_check_{timestamp}.log") + + # Write stdout and stderr to the log file + with open(log_file_path, "w") as log_file: + log_file.write("STDOUT:\n") + log_file.write(result.stdout) + log_file.write("\nSTDERR:\n") + log_file.write(result.stderr) + # Try to parse the JSON output try: issues = json.loads(result.stdout) except json.JSONDecodeError: - issues = {"raw_output": result.stdout} + issues = {"raw_output": "See log file for details"} return { "success": True, - "message": "Compatibility check completed successfully", + "message": f"Compatibility check completed successfully. Logs are available at {log_file_path}", "issues": issues, - "stdout": result.stdout, - "stderr": result.stderr, } except subprocess.CalledProcessError as e: logger.error(f"Error checking compatibility: {e.stderr}") @@ -360,18 +417,37 @@ async def show_compatible_indexes( check=True ) + # Create a log file for the output + try: + # Try to use a directory in the user's home directory + log_dir = os.path.join(os.path.expanduser("~"), ".documentdb-migration", "logs") + os.makedirs(log_dir, exist_ok=True) + except Exception as e: + # Fall back to a temporary directory if home directory is not accessible + logger.warning(f"Could not create log directory in home directory: {str(e)}") + log_dir = tempfile.mkdtemp(prefix="documentdb_migration_logs_") + logger.info(f"Using temporary directory for logs: {log_dir}") + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + log_file_path = os.path.join(log_dir, f"compatible_indexes_{timestamp}.log") + + # Write stdout and stderr to the log file + with open(log_file_path, "w") as log_file: + log_file.write("STDOUT:\n") + log_file.write(result.stdout) + log_file.write("\nSTDERR:\n") + log_file.write(result.stderr) + # Try to parse the JSON output try: compatible_indexes = json.loads(result.stdout) except json.JSONDecodeError: - compatible_indexes = {"raw_output": result.stdout} + compatible_indexes = {"raw_output": "See log file for details"} return { "success": True, - "message": "Compatible indexes check completed successfully", + "message": f"Compatible indexes check completed successfully. Logs are available at {log_file_path}", "compatible_indexes": compatible_indexes, - "stdout": result.stdout, - "stderr": result.stderr, } except subprocess.CalledProcessError as e: logger.error(f"Error checking compatible indexes: {e.stderr}") diff --git a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/migration_workflow.py b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/migration_workflow.py new file mode 100644 index 0000000..c6cfa83 --- /dev/null +++ b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/migration_workflow.py @@ -0,0 +1,244 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance +# with the License. A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions +# and limitations under the License. + +"""Migration workflow tools for DocumentDB Migration MCP Server.""" + +import os +import tempfile +import time +from typing import Annotated, Any, Dict, List, Optional +from pydantic import Field +from loguru import logger + +from awslabs.documentdb_migration_mcp_server.index_tools import ( + dump_indexes, restore_indexes, show_compatibility_issues +) +from awslabs.documentdb_migration_mcp_server.full_load_tools import run_full_load +# CDC tools are not used in this workflow +from awslabs.documentdb_migration_mcp_server.boundary_tools import generate_boundaries + + +async def run_easy_migration( + source_uri: Annotated[ + str, + Field( + description='Source URI in MongoDB Connection String format' + ), + ], + target_uri: Annotated[ + str, + Field( + description='Target URI in MongoDB Connection String format' + ), + ], + source_namespace: Annotated[ + str, + Field( + description='Source Namespace as .' + ), + ], + target_namespace: Annotated[ + str, + Field( + description='Target Namespace as ., defaults to source_namespace' + ), + ] = None, + max_inserts_per_batch: Annotated[ + int, + Field( + description='Maximum number of inserts to include in a single batch' + ), + ] = 100, + feedback_seconds: Annotated[ + int, + Field( + description='Number of seconds between feedback output' + ), + ] = 60, + dry_run: Annotated[ + bool, + Field( + description='Read source changes only, do not apply to target' + ), + ] = False, + verbose: Annotated[ + bool, + Field( + description='Enable verbose logging' + ), + ] = False, + create_cloudwatch_metrics: Annotated[ + bool, + Field( + description='Create CloudWatch metrics for monitoring' + ), + ] = False, + cluster_name: Annotated[ + str, + Field( + description='Name of cluster for CloudWatch metrics' + ), + ] = None, + skip_incompatible_indexes: Annotated[ + bool, + Field( + description='Skip incompatible indexes when restoring metadata' + ), + ] = True, + support_2dsphere: Annotated[ + bool, + Field( + description='Support 2dsphere indexes creation (collections must use GeoJSON Point type for indexing)' + ), + ] = False, + skip_id_indexes: Annotated[ + bool, + Field( + description='Do not create _id indexes' + ), + ] = True, +) -> Dict[str, Any]: + """Run an easy migration workflow from source to target. + + This tool executes a complete migration workflow: + 1. Check compatibility of indexes + 2. Dump indexes from source + 3. Restore indexes to target + 4. Run full load migration + + After the migration is complete, you can use the getResumeToken and runCDC tools + to set up Change Data Capture (CDC) for continuous replication. + + Returns: + Dict[str, Any]: Status of the migration operation + """ + results = {} + + # Set target_namespace to source_namespace if not provided + if not target_namespace: + target_namespace = source_namespace + + # Step 1: Create a temporary directory for index operations + index_dir = tempfile.mkdtemp(prefix="migration_indexes_") + logger.info(f"Created temporary directory for index operations: {index_dir}") + results["index_dir"] = index_dir + + # Step 2: Dump indexes from source + logger.info("Step 1/5: Dumping indexes from source...") + dump_result = await dump_indexes( + uri=source_uri, + output_dir=index_dir, + dry_run=dry_run, + debug=verbose + ) + # Only include essential information in the results + results["dump_indexes"] = { + "success": dump_result["success"], + "message": dump_result["message"], + "output_dir": dump_result.get("output_dir", "") + } + + if not dump_result["success"]: + logger.error("Failed to dump indexes from source") + return results + + # Step 3: Check compatibility of indexes + logger.info("Step 2/5: Checking compatibility of indexes...") + compatibility_result = await show_compatibility_issues( + index_dir=index_dir, + debug=verbose + ) + # Only include essential information in the results + results["compatibility_check"] = { + "success": compatibility_result["success"], + "message": compatibility_result["message"] + } + # Include issues if they exist + if "issues" in compatibility_result: + results["compatibility_check"]["issues"] = compatibility_result["issues"] + + # Step 4: Restore indexes to target + logger.info("Step 3/5: Restoring indexes to target...") + restore_result = await restore_indexes( + uri=target_uri, + index_dir=index_dir, + skip_incompatible=skip_incompatible_indexes, + support_2dsphere=support_2dsphere, + dry_run=dry_run, + debug=verbose, + shorten_index_name=True, + skip_id_indexes=skip_id_indexes + ) + # Only include essential information in the results + results["restore_indexes"] = { + "success": restore_result["success"], + "message": restore_result["message"] + } + + if not restore_result["success"]: + logger.warning("Failed to restore some indexes to target, but continuing with migration") + + # Step 5: Run full load migration + logger.info("Step 4/5: Running full load migration...") + + # Generate boundaries for segmenting the collection + try: + (db_name, collection_name) = source_namespace.split('.', 1) + boundaries_result = await generate_boundaries( + uri=source_uri, + database=db_name, + collection=collection_name, + num_segments=4, # Use 4 segments by default + use_single_cursor=False + ) + # Only include essential information in the results + results["boundaries"] = { + "success": boundaries_result["success"], + "message": boundaries_result["message"] + } + boundaries = boundaries_result.get("boundaries_csv", None) + except Exception as e: + logger.warning(f"Failed to generate boundaries: {str(e)}") + boundaries = None + + full_load_result = await run_full_load( + source_uri=source_uri, + target_uri=target_uri, + source_namespace=source_namespace, + target_namespace=target_namespace, + boundaries=boundaries, + max_inserts_per_batch=max_inserts_per_batch, + feedback_seconds=feedback_seconds, + dry_run=dry_run, + verbose=verbose, + create_cloudwatch_metrics=create_cloudwatch_metrics, + cluster_name=cluster_name + ) + # Only include essential information in the results + results["full_load"] = { + "success": full_load_result["success"], + "message": full_load_result["message"], + "log_file": full_load_result.get("log_file", "") + } + + if not full_load_result["success"]: + logger.error("Failed to run full load migration") + return results + + # Step 5: Add message about CDC + logger.info("Step 5/5: Full load migration completed") + logger.info("To start CDC (Change Data Capture), use the getResumeToken tool to get a resume token, then use the runCDC tool with that token") + + logger.info("Migration workflow completed successfully") + results["success"] = True + results["message"] = "Migration workflow completed successfully" + + return results diff --git a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/documentdb_index_tool.py b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/documentdb_index_tool.py index 36ed6f4..5f3a8e1 100644 --- a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/documentdb_index_tool.py +++ b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/scripts/documentdb_index_tool.py @@ -22,6 +22,7 @@ import sys import string import random +import tempfile from bson.json_util import dumps from pymongo import MongoClient @@ -598,7 +599,7 @@ def main(): parser.add_argument('--debug',required=False,action='store_true',help='Output debugging information') parser.add_argument('--dry-run',required=False,action='store_true',help='Perform processing, but do not actually export or restore indexes') parser.add_argument('--uri',required=False,type=str,help='URI to connect to MongoDB or Amazon DocumentDB') - parser.add_argument('--dir',required=True,type=str,help='Specify the folder to export to or restore from (required)') + parser.add_argument('--dir',required=False,type=str,help='Specify the folder to export to or restore from (default: index_dump)') parser.add_argument('--show-compatible',required=False,action='store_true',dest='show_compatible',help='Output all compatible indexes with Amazon DocumentDB (no change is applied)') parser.add_argument('--show-issues',required=False,action='store_true',dest='show_issues',help='Output a report of compatibility issues found') parser.add_argument('--dump-indexes',required=False,action='store_true',help='Perform index export from the specified server') @@ -623,9 +624,16 @@ def main(): message = "Must specify one of [--dump-indexes | --restore-indexes | --show-issues | --show-compatible]" parser.error(message) - if args.dir is not None: - if not os.path.isdir(args.dir): - parser.error("--dir must specify a directory") + # Create a temporary directory if output_dir is not provided + if args.dir is None: + args.dir = tempfile.mkdtemp(prefix="index_dump_") + logging.info(f"No directory specified, created temporary directory: {args.dir}") + elif not os.path.isdir(args.dir): + try: + os.makedirs(args.dir) + logging.info(f"Created directory: {args.dir}") + except Exception as e: + parser.error(f"Failed to create directory {args.dir}: {str(e)}") if args.dump_indexes is True: if args.restore_indexes is True: diff --git a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/server.py b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/server.py index 03f5e8b..b88aa45 100644 --- a/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/server.py +++ b/mcp-server/documentdb-migration-mcp/awslabs/documentdb_migration_mcp_server/server.py @@ -20,6 +20,7 @@ from awslabs.documentdb_migration_mcp_server.index_tools import ( dump_indexes, restore_indexes, show_compatibility_issues, show_compatible_indexes ) +from awslabs.documentdb_migration_mcp_server.migration_workflow import run_easy_migration from loguru import logger from mcp.server.fastmcp import FastMCP @@ -30,12 +31,18 @@ instructions="""DocumentDB Migration MCP Server provides tools to migrate data to AWS DocumentDB. Usage pattern: - 1. For full load migrations, use the `runFullLoad` or `runFilteredFullLoad` tools + 1. For a complete end-to-end migration workflow, use the `runEasyMigration` tool + - This tool combines index management and full load migration in a single workflow + - It handles index compatibility checking, dumping, and restoring + - It runs a full load migration with auto-generated boundaries + - After migration is complete, you can use getResumeToken and runCDC tools for CDC + + 2. For full load migrations, use the `runFullLoad` or `runFilteredFullLoad` tools - Boundaries will be auto-generated if not provided - 2. For CDC (Change Data Capture) migrations, use the `runCDC` tool - 3. To get a change stream resume token for CDC, use the `getResumeToken` tool - 4. To generate boundaries for segmenting collections, use the `generateBoundaries` tool - 5. For index management: + 3. For CDC (Change Data Capture) migrations, use the `runCDC` tool + 4. To get a change stream resume token for CDC, use the `getResumeToken` tool + 5. To generate boundaries for segmenting collections, use the `generateBoundaries` tool + 6. For index management: - To dump indexes from a source database, use the `dumpIndexes` tool - To restore indexes to a target database, use the `restoreIndexes` tool - To check index compatibility with DocumentDB, use the `showIndexCompatibilityIssues` tool @@ -71,6 +78,9 @@ mcp.tool(name='showIndexCompatibilityIssues')(show_compatibility_issues) mcp.tool(name='showCompatibleIndexes')(show_compatible_indexes) +# Workflow tools +mcp.tool(name='runEasyMigration')(run_easy_migration) + def main(): """Run the MCP server with CLI argument support.""" From 89793692770afb954c5752d6c2525ddfddc0623d Mon Sep 17 00:00:00 2001 From: Inderpreet Singh Date: Wed, 9 Jul 2025 12:45:41 -0700 Subject: [PATCH 6/6] Update README.md --- mcp-server/documentdb-migration-mcp/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mcp-server/documentdb-migration-mcp/README.md b/mcp-server/documentdb-migration-mcp/README.md index a8d05ff..31f0f3b 100644 --- a/mcp-server/documentdb-migration-mcp/README.md +++ b/mcp-server/documentdb-migration-mcp/README.md @@ -244,7 +244,7 @@ Show compatible indexes with Amazon DocumentDB. - `debug`: (Optional) Output debugging information ## Requirements -- Install uv from [Astral] (https://docs.astral.sh/uv/getting-started/installation/) +- Install uv from [Astral](https://docs.astral.sh/uv/getting-started/installation/) - Python 3.10+ - PyMongo - Boto3 (for CloudWatch metrics)