diff --git a/aws_lambda_powertools/utilities/idempotency/base.py b/aws_lambda_powertools/utilities/idempotency/base.py index e1a7d78da40..eedc3e785f4 100644 --- a/aws_lambda_powertools/utilities/idempotency/base.py +++ b/aws_lambda_powertools/utilities/idempotency/base.py @@ -123,10 +123,18 @@ def __init__( self.persistence_store = persistence_store - def handle(self) -> Any: + def handle(self, is_replay: bool = False) -> Any: """ Main entry point for handling idempotent execution of a function. + Parameters + ---------- + is_replay : bool, optional + Whether this is a replay of a function that is already in progress. + If True, allows replay of functions that are already in progress. + If False, uses standard idempotency behavior (raises IdempotencyAlreadyInProgressError). + Defaults to False. + Returns ------- Any @@ -138,12 +146,12 @@ def handle(self) -> Any: # In most cases we can retry successfully on this exception. for i in range(MAX_RETRIES + 1): # pragma: no cover try: - return self._process_idempotency() + return self._process_idempotency(is_replay) except IdempotencyInconsistentStateError: if i == MAX_RETRIES: raise # Bubble up when exceeded max tries - def _process_idempotency(self): + def _process_idempotency(self, is_replay: bool): try: # We call save_inprogress first as an optimization for the most common case where no idempotent record # already exists. If it succeeds, there's no need to call get_record. @@ -159,7 +167,8 @@ def _process_idempotency(self): # We give preference to ReturnValuesOnConditionCheckFailure because it is a faster and more cost-effective # way of retrieving the existing record after a failed conditional write operation. record = exc.old_data_record or self._get_idempotency_record() - + if is_replay: + return self._get_function_response() # If a record is found, handle it for status if record: return self._handle_for_status(record) diff --git a/aws_lambda_powertools/utilities/idempotency/idempotency.py b/aws_lambda_powertools/utilities/idempotency/idempotency.py index f59d7df7179..6e696a2e579 100644 --- a/aws_lambda_powertools/utilities/idempotency/idempotency.py +++ b/aws_lambda_powertools/utilities/idempotency/idempotency.py @@ -28,7 +28,7 @@ from aws_lambda_powertools.utilities.idempotency.persistence.base import ( BasePersistenceLayer, ) - from aws_lambda_powertools.utilities.typing import LambdaContext + from aws_lambda_powertools.utilities.typing import DurableContext, LambdaContext from aws_lambda_powertools.warnings import PowertoolsUserWarning @@ -37,9 +37,9 @@ @lambda_handler_decorator def idempotent( - handler: Callable[[Any, LambdaContext], Any], + handler: Callable[[Any, LambdaContext | DurableContext], Any], event: dict[str, Any], - context: LambdaContext, + context: LambdaContext | DurableContext, persistence_store: BasePersistenceLayer, config: IdempotencyConfig | None = None, key_prefix: str | None = None, @@ -55,7 +55,7 @@ def idempotent( event: dict Lambda's Event context: dict - Lambda's Context + Lambda's Context or Durable Context persistence_store: BasePersistenceLayer Instance of BasePersistenceLayer to store data config: IdempotencyConfig @@ -91,7 +91,17 @@ def handler(event, context): return handler(event, context, **kwargs) config = config or IdempotencyConfig() - config.register_lambda_context(context) + + if hasattr(context, "state"): + # Extract lambda_context from DurableContext + durable_context = cast("DurableContext", context) + config.register_lambda_context(durable_context.lambda_context) + # Note: state.operations is accessed via duck typing at runtime + is_replay = len(durable_context.state.operations) > 1 # type: ignore[attr-defined] + else: + # Standard LambdaContext + config.register_lambda_context(context) + is_replay = False args = event, context idempotency_handler = IdempotencyHandler( @@ -104,7 +114,7 @@ def handler(event, context): function_kwargs=kwargs, ) - return idempotency_handler.handle() + return idempotency_handler.handle(is_replay=is_replay) def idempotent_function( diff --git a/aws_lambda_powertools/utilities/typing/__init__.py b/aws_lambda_powertools/utilities/typing/__init__.py index 22f907025fc..a28feb2c53c 100644 --- a/aws_lambda_powertools/utilities/typing/__init__.py +++ b/aws_lambda_powertools/utilities/typing/__init__.py @@ -4,6 +4,6 @@ [`Typing`](../utilities/typing.md) """ -from .lambda_context import LambdaContext +from .lambda_context import DurableContext, LambdaContext -__all__ = ["LambdaContext"] +__all__ = ["DurableContext", "LambdaContext"] diff --git a/aws_lambda_powertools/utilities/typing/lambda_context.py b/aws_lambda_powertools/utilities/typing/lambda_context.py index 49fb7044792..e501535720c 100644 --- a/aws_lambda_powertools/utilities/typing/lambda_context.py +++ b/aws_lambda_powertools/utilities/typing/lambda_context.py @@ -93,3 +93,16 @@ def tenant_id(self) -> str | None: def get_remaining_time_in_millis() -> int: """Returns the number of milliseconds left before the execution times out.""" return 0 + + +class DurableContext: + _lambda_context: LambdaContext + _state: object + + @property + def lambda_context(self) -> LambdaContext: + return self._lambda_context + + @property + def state(self) -> object: + return self._state diff --git a/tests/functional/idempotency/_boto3/test_idempotency.py b/tests/functional/idempotency/_boto3/test_idempotency.py index 17f14c2c182..202c0e5a9a5 100644 --- a/tests/functional/idempotency/_boto3/test_idempotency.py +++ b/tests/functional/idempotency/_boto3/test_idempotency.py @@ -46,6 +46,7 @@ from aws_lambda_powertools.utilities.idempotency.serialization.dataclass import ( DataclassSerializer, ) +from aws_lambda_powertools.utilities.typing import DurableContext from aws_lambda_powertools.utilities.validation import envelopes, validator from aws_lambda_powertools.warnings import PowertoolsUserWarning from tests.functional.idempotency.utils import ( @@ -2136,3 +2137,189 @@ def lambda_handler(record, context): result = lambda_handler(mock_event, lambda_context) # THEN we expect the function to execute successfully assert result == expected_result + + +# Tests: Durable Functions Integration + + +@pytest.fixture +def durable_context_single_operation(lambda_context): + """DurableContext with single operation (execution mode, is_replay=False)""" + durable_ctx = DurableContext() + durable_ctx._lambda_context = lambda_context + durable_ctx._state = Mock(operations=[{"id": "op1"}]) + return durable_ctx + + +@pytest.fixture +def durable_context_multiple_operations(lambda_context): + """DurableContext with multiple operations (replay mode, is_replay=True)""" + durable_ctx = DurableContext() + durable_ctx._lambda_context = lambda_context + durable_ctx._state = Mock(operations=[{"id": "op1"}, {"id": "op2"}]) + return durable_ctx + + +@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}], indirect=True) +def test_idempotent_lambda_with_durable_context_first_execution( + idempotency_config: IdempotencyConfig, + persistence_store: DynamoDBPersistenceLayer, + lambda_apigw_event, + durable_context_single_operation, + lambda_response, +): + """ + Test idempotent decorator with DurableContext during first execution (execution mode). + + When a durable function executes for the first time (single operation in state), + is_replay=False, and the function should execute normally, saving the result. + """ + # GIVEN + stubber = stub.Stubber(persistence_store.client) + stubber.add_response("put_item", {}) + stubber.add_response("update_item", {}) + stubber.activate() + + # WHEN + @idempotent(config=idempotency_config, persistence_store=persistence_store) + def lambda_handler(event, context): + return lambda_response + + result = lambda_handler(lambda_apigw_event, durable_context_single_operation) + + # THEN + assert result == lambda_response + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}], indirect=True) +def test_idempotent_lambda_with_durable_context_during_replay( + idempotency_config: IdempotencyConfig, + persistence_store: DynamoDBPersistenceLayer, + lambda_apigw_event, + durable_context_multiple_operations, + timestamp_future, + lambda_response, + serialized_lambda_response, +): + """ + Test idempotent decorator with DurableContext during workflow replay (replay mode). + + When a durable function replays (multiple operations in state), is_replay=True. + The function should execute once to get the response and save it, even when + an INPROGRESS record exists from a previous execution. + """ + # GIVEN + hashed_key = hash_idempotency_key(data=lambda_apigw_event) + + stubber = stub.Stubber(persistence_store.client) + ddb_response = { + "Item": { + "id": {"S": hashed_key}, + "expiration": {"N": timestamp_future}, + "data": {"S": serialized_lambda_response}, + "status": {"S": "INPROGRESS"}, + }, + } + stubber.add_client_error("put_item", "ConditionalCheckFailedException", modeled_fields=ddb_response) + # In replay mode, function still executes once to get response, then saves it + stubber.add_response("update_item", {}) + stubber.activate() + + # WHEN + @idempotent(config=idempotency_config, persistence_store=persistence_store) + def lambda_handler(event, context): + return lambda_response + + result = lambda_handler(lambda_apigw_event, durable_context_multiple_operations) + + # THEN - Should return result in replay mode + assert result == lambda_response + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}], indirect=True) +def test_idempotent_lambda_extracts_lambda_context_from_durable_context( + idempotency_config: IdempotencyConfig, + persistence_store: DynamoDBPersistenceLayer, + lambda_apigw_event, + durable_context_single_operation, + lambda_response, +): + """ + Test that idempotency properly extracts LambdaContext from DurableContext. + + The @idempotent decorator should extract the wrapped lambda_context from + DurableContext for tracking remaining time and other Lambda-specific features. + """ + # GIVEN + stubber = stub.Stubber(persistence_store.client) + stubber.add_response("put_item", {}) + stubber.add_response("update_item", {}) + stubber.activate() + + # WHEN + @idempotent(config=idempotency_config, persistence_store=persistence_store) + def lambda_handler(event, context): + # Verify we can access lambda_context properties + assert hasattr(context, "lambda_context") + assert context.lambda_context.function_name == "test-func" + return lambda_response + + result = lambda_handler(lambda_apigw_event, durable_context_single_operation) + + # THEN + assert result == lambda_response + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}], indirect=True) +def test_idempotent_lambda_concurrent_durable_executions_raise_in_progress_error( + idempotency_config: IdempotencyConfig, + persistence_store: DynamoDBPersistenceLayer, + lambda_apigw_event, + durable_context_single_operation, + lambda_response, +): + """ + Test that concurrent durable executions are prevented by IdempotencyAlreadyInProgressError. + + Scenario: Two different durable function executions attempt to process the same + idempotent operation concurrently: + 1. First execution creates an INPROGRESS record + 2. Second execution (in execution mode, is_replay=False) finds the INPROGRESS record + 3. Second execution should raise IdempotencyAlreadyInProgressError to prevent duplicate work + + This ensures data consistency when multiple durable function instances execute concurrently. + """ + # GIVEN + hashed_key = hash_idempotency_key(data=lambda_apigw_event) + + stubber = stub.Stubber(persistence_store.client) + # Simulate existing INPROGRESS record with far future timestamps + ddb_response = { + "Item": { + "id": {"S": hashed_key}, + "expiration": {"N": "9999999999"}, + "in_progress_expiration": {"N": "9999999999999"}, # Far future in milliseconds + "status": {"S": "INPROGRESS"}, + }, + } + stubber.add_client_error("put_item", "ConditionalCheckFailedException", modeled_fields=ddb_response) + stubber.activate() + + # WHEN / THEN - Should raise IdempotencyAlreadyInProgressError in execution mode + @idempotent(config=idempotency_config, persistence_store=persistence_store) + def lambda_handler(event, context): + return lambda_response + + with pytest.raises(IdempotencyAlreadyInProgressError) as exc_info: + lambda_handler(lambda_apigw_event, durable_context_single_operation) + + # Verify error message contains the idempotency key + assert hashed_key in str(exc_info.value) + stubber.assert_no_pending_responses() + stubber.deactivate()