Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/api/sagemaker_mlops.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,12 @@ Local Development
:members:
:undoc-members:
:show-inheritance:


Feature Store
-------------

.. automodule:: sagemaker.mlops.feature_store
:members:
:undoc-members:
:show-inheritance:
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies = [
train = ["sagemaker-train"]
serve = ["sagemaker-serve"]
mlops = ["sagemaker-mlops"]
feature-processor = ["sagemaker-mlops", "pyspark==3.3.2", "sagemaker-feature-store-pyspark-3.3", "setuptools<82"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feature-processor used to require an extra pip install pip install sagemaker[feature-processor] this is because it requires extra dependencies that don't make sense to require all sagemaker users to install docs

V2 requirements: https://github.com/aws/sagemaker-python-sdk/blob/master-v2/requirements/extras/feature-processor_requirements.txt
The extra setuptools<82 is because setuptools removed a module named pkg_resources https://setuptools.pypa.io/en/latest/history.html#v82-0-0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feature-processor is a functionality within sagemaker-mlops .

From our tenet , we are creating namespaces for higher level flows such as training , inference , mlops.

Can this be within sagemaker-mlops

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can but do we want everyone doing pip install sagemaker-mlops to install pyspark?

Also if we move it to sagemaker-mlops we would need to update all docs that reference installing sagemaker[feature-processor]

all = ["sagemaker-train", "sagemaker-serve", "sagemaker-mlops"]

[project.urls]
Expand Down
3 changes: 2 additions & 1 deletion requirements/extras/test_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ scipy
omegaconf
graphene
typing_extensions>=4.9.0
tensorflow>=2.16.2,<=2.19.0
tensorflow>=2.16.2,<=2.19.0
build
5 changes: 5 additions & 0 deletions sagemaker-mlops/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ test = [
"pytest",
"pytest-cov",
"mock",
"setuptools<82",
"pyspark==3.3.2",
"sagemaker-feature-store-pyspark-3.3",
"pandas<3.0",
"numpy<3.0",
]
dev = [
"pytest",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Exported classes for the sagemaker.mlops.feature_store.feature_processor module."""
from __future__ import absolute_import

from sagemaker.mlops.feature_store.feature_processor._data_source import ( # noqa: F401
CSVDataSource,
FeatureGroupDataSource,
ParquetDataSource,
BaseDataSource,
PySparkDataSource,
)
from sagemaker.mlops.feature_store.feature_processor._exceptions import ( # noqa: F401
IngestionError,
)
from sagemaker.mlops.feature_store.feature_processor.feature_processor import ( # noqa: F401
feature_processor,
)
from sagemaker.mlops.feature_store.feature_processor.feature_scheduler import ( # noqa: F401
to_pipeline,
schedule,
describe,
put_trigger,
delete_trigger,
enable_trigger,
disable_trigger,
delete_schedule,
list_pipelines,
execute,
TransformationCode,
FeatureProcessorPipelineEvents,
)
from sagemaker.mlops.feature_store.feature_processor._enums import ( # noqa: F401
FeatureProcessorPipelineExecutionStatus,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Contains classes for preparing and uploading configs for a scheduled feature processor."""
from __future__ import absolute_import
from typing import Callable, Dict, Optional, Tuple, List, Union

import attr

from sagemaker.core.helper.session_helper import Session
from sagemaker.mlops.feature_store.feature_processor._constants import (
SPARK_JAR_FILES_PATH,
SPARK_PY_FILES_PATH,
SPARK_FILES_PATH,
S3_DATA_DISTRIBUTION_TYPE,
)
from sagemaker.core.inputs import TrainingInput
from sagemaker.core.shapes import Channel, DataSource, S3DataSource
from sagemaker.core.remote_function.core.stored_function import StoredFunction
from sagemaker.core.remote_function.job import (
_prepare_and_upload_workspace,
_prepare_and_upload_runtime_scripts,
_JobSettings,
RUNTIME_SCRIPTS_CHANNEL_NAME,
REMOTE_FUNCTION_WORKSPACE,
SPARK_CONF_CHANNEL_NAME,
_prepare_and_upload_spark_dependent_files,
)
from sagemaker.core.remote_function.runtime_environment.runtime_environment_manager import (
RuntimeEnvironmentManager,
)
from sagemaker.core.remote_function.spark_config import SparkConfig
from sagemaker.core.remote_function.custom_file_filter import CustomFileFilter
from sagemaker.core.s3 import s3_path_join


@attr.s
class ConfigUploader:
"""Prepares and uploads customer provided configs to S3"""

remote_decorator_config: _JobSettings = attr.ib()
runtime_env_manager: RuntimeEnvironmentManager = attr.ib()

def prepare_step_input_channel_for_spark_mode(
self, func: Callable, s3_base_uri: str, sagemaker_session: Session
) -> Tuple[List[Channel], Dict]:
"""Prepares input channels for SageMaker Pipeline Step.

Returns:
Tuple of (List[Channel], spark_dependency_paths dict)
"""
self._prepare_and_upload_callable(func, s3_base_uri, sagemaker_session)
bootstrap_scripts_s3uri = self._prepare_and_upload_runtime_scripts(
self.remote_decorator_config.spark_config,
s3_base_uri,
self.remote_decorator_config.s3_kms_key,
sagemaker_session,
)
dependencies_list_path = self.runtime_env_manager.snapshot(
self.remote_decorator_config.dependencies
)
user_workspace_s3uri = self._prepare_and_upload_workspace(
dependencies_list_path,
self.remote_decorator_config.include_local_workdir,
self.remote_decorator_config.pre_execution_commands,
self.remote_decorator_config.pre_execution_script,
s3_base_uri,
self.remote_decorator_config.s3_kms_key,
sagemaker_session,
self.remote_decorator_config.custom_file_filter,
)

(
submit_jars_s3_paths,
submit_py_files_s3_paths,
submit_files_s3_path,
config_file_s3_uri,
) = self._prepare_and_upload_spark_dependent_files(
self.remote_decorator_config.spark_config,
s3_base_uri,
self.remote_decorator_config.s3_kms_key,
sagemaker_session,
)

channels = [
Channel(
channel_name=RUNTIME_SCRIPTS_CHANNEL_NAME,
data_source=DataSource(
s3_data_source=S3DataSource(
s3_uri=bootstrap_scripts_s3uri,
s3_data_type="S3Prefix",
s3_data_distribution_type=S3_DATA_DISTRIBUTION_TYPE,
)
),
input_mode="File",
)
]

if user_workspace_s3uri:
channels.append(
Channel(
channel_name=REMOTE_FUNCTION_WORKSPACE,
data_source=DataSource(
s3_data_source=S3DataSource(
s3_uri=s3_path_join(s3_base_uri, REMOTE_FUNCTION_WORKSPACE),
s3_data_type="S3Prefix",
s3_data_distribution_type=S3_DATA_DISTRIBUTION_TYPE,
)
),
input_mode="File",
)
)

if config_file_s3_uri:
channels.append(
Channel(
channel_name=SPARK_CONF_CHANNEL_NAME,
data_source=DataSource(
s3_data_source=S3DataSource(
s3_uri=config_file_s3_uri,
s3_data_type="S3Prefix",
s3_data_distribution_type=S3_DATA_DISTRIBUTION_TYPE,
)
),
input_mode="File",
)
)

return channels, {
SPARK_JAR_FILES_PATH: submit_jars_s3_paths,
SPARK_PY_FILES_PATH: submit_py_files_s3_paths,
SPARK_FILES_PATH: submit_files_s3_path,
}

def _prepare_and_upload_callable(
self, func: Callable, s3_base_uri: str, sagemaker_session: Session
) -> None:
"""Prepares and uploads callable to S3"""
stored_function = StoredFunction(
sagemaker_session=sagemaker_session,
s3_base_uri=s3_base_uri,
s3_kms_key=self.remote_decorator_config.s3_kms_key,
)
stored_function.save(func)

def _prepare_and_upload_workspace(
self,
local_dependencies_path: str,
include_local_workdir: bool,
pre_execution_commands: List[str],
pre_execution_script_local_path: str,
s3_base_uri: str,
s3_kms_key: str,
sagemaker_session: Session,
custom_file_filter: Optional[Union[Callable[[str, List], List], CustomFileFilter]] = None,
) -> str:
"""Upload the training step dependencies to S3 if present"""
return _prepare_and_upload_workspace(
local_dependencies_path=local_dependencies_path,
include_local_workdir=include_local_workdir,
pre_execution_commands=pre_execution_commands,
pre_execution_script_local_path=pre_execution_script_local_path,
s3_base_uri=s3_base_uri,
s3_kms_key=s3_kms_key,
sagemaker_session=sagemaker_session,
custom_file_filter=custom_file_filter,
)

def _prepare_and_upload_runtime_scripts(
self,
spark_config: SparkConfig,
s3_base_uri: str,
s3_kms_key: str,
sagemaker_session: Session,
) -> str:
"""Copy runtime scripts to a folder and upload to S3"""
return _prepare_and_upload_runtime_scripts(
spark_config=spark_config,
s3_base_uri=s3_base_uri,
s3_kms_key=s3_kms_key,
sagemaker_session=sagemaker_session,
)

def _prepare_and_upload_spark_dependent_files(
self,
spark_config: SparkConfig,
s3_base_uri: str,
s3_kms_key: str,
sagemaker_session: Session,
) -> Tuple:
"""Upload the spark dependencies to S3 if present"""
if not spark_config:
return None, None, None, None

return _prepare_and_upload_spark_dependent_files(
spark_config=spark_config,
s3_base_uri=s3_base_uri,
s3_kms_key=s3_kms_key,
sagemaker_session=sagemaker_session,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Module containing constants for feature_processor and feature_scheduler module."""
from __future__ import absolute_import

from sagemaker.core.workflow.parameters import Parameter, ParameterTypeEnum

DEFAULT_INSTANCE_TYPE = "ml.m5.xlarge"
DEFAULT_SCHEDULE_STATE = "ENABLED"
DEFAULT_TRIGGER_STATE = "ENABLED"
UNDERSCORE = "_"
RESOURCE_NOT_FOUND_EXCEPTION = "ResourceNotFoundException"
RESOURCE_NOT_FOUND = "ResourceNotFound"
EXECUTION_TIME_PIPELINE_PARAMETER = "scheduled_time"
VALIDATION_EXCEPTION = "ValidationException"
EVENT_BRIDGE_INVOCATION_TIME = "<aws.scheduler.scheduled-time>"
SCHEDULED_TIME_PIPELINE_PARAMETER = Parameter(
name=EXECUTION_TIME_PIPELINE_PARAMETER, parameter_type=ParameterTypeEnum.STRING
)
EXECUTION_TIME_PIPELINE_PARAMETER_FORMAT = "%Y-%m-%dT%H:%M:%SZ" # 2023-01-01T07:00:00Z
NO_FLEXIBLE_TIME_WINDOW = dict(Mode="OFF")
PIPELINE_NAME_MAXIMUM_LENGTH = 80
PIPELINE_CONTEXT_TYPE = "FeatureEngineeringPipeline"
SPARK_JAR_FILES_PATH = "submit_jars_s3_paths"
SPARK_PY_FILES_PATH = "submit_py_files_s3_paths"
SPARK_FILES_PATH = "submit_files_s3_path"
FEATURE_PROCESSOR_TAG_KEY = "sm-fs-fe:created-from"
FEATURE_PROCESSOR_TAG_VALUE = "fp-to-pipeline"
FEATURE_GROUP_ARN_REGEX_PATTERN = r"arn:(.*?):sagemaker:(.*?):(.*?):feature-group/(.*?)$"
PIPELINE_ARN_REGEX_PATTERN = r"arn:(.*?):sagemaker:(.*?):(.*?):pipeline/(.*?)$"
EVENTBRIDGE_RULE_ARN_REGEX_PATTERN = r"arn:(.*?):events:(.*?):(.*?):rule/(.*?)$"
SAGEMAKER_WHL_FILE_S3_PATH = "s3://ada-private-beta/sagemaker-2.151.1.dev0-py2.py3-none-any.whl"
S3_DATA_DISTRIBUTION_TYPE = "FullyReplicated"
PIPELINE_CONTEXT_NAME_TAG_KEY = "sm-fs-fe:feature-engineering-pipeline-context-name"
PIPELINE_VERSION_CONTEXT_NAME_TAG_KEY = "sm-fs-fe:feature-engineering-pipeline-version-context-name"
TO_PIPELINE_RESERVED_TAG_KEYS = [
FEATURE_PROCESSOR_TAG_KEY,
PIPELINE_CONTEXT_NAME_TAG_KEY,
PIPELINE_VERSION_CONTEXT_NAME_TAG_KEY,
]
BASE_EVENT_PATTERN = {
"source": ["aws.sagemaker"],
"detail": {"currentPipelineExecutionStatus": [], "pipelineArn": []},
}
Loading
Loading