Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions aws_lambda_powertools/utilities/idempotency/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,18 @@ def __init__(

self.persistence_store = persistence_store

def handle(self) -> Any:
def handle(self, is_replay: bool = False) -> Any:
"""
Main entry point for handling idempotent execution of a function.

Parameters
----------
is_replay : bool, optional
Whether this is a replay of a function that is already in progress.
If True, allows replay of functions that are already in progress.
If False, uses standard idempotency behavior (raises IdempotencyAlreadyInProgressError).
Defaults to False.

Returns
-------
Any
Expand All @@ -138,12 +146,12 @@ def handle(self) -> Any:
# In most cases we can retry successfully on this exception.
for i in range(MAX_RETRIES + 1): # pragma: no cover
try:
return self._process_idempotency()
return self._process_idempotency(is_replay)
except IdempotencyInconsistentStateError:
if i == MAX_RETRIES:
raise # Bubble up when exceeded max tries

def _process_idempotency(self):
def _process_idempotency(self, is_replay: bool):
try:
# We call save_inprogress first as an optimization for the most common case where no idempotent record
# already exists. If it succeeds, there's no need to call get_record.
Expand All @@ -159,7 +167,8 @@ def _process_idempotency(self):
# We give preference to ReturnValuesOnConditionCheckFailure because it is a faster and more cost-effective
# way of retrieving the existing record after a failed conditional write operation.
record = exc.old_data_record or self._get_idempotency_record()

if is_replay:
return self._get_function_response()
# If a record is found, handle it for status
if record:
return self._handle_for_status(record)
Expand Down
22 changes: 16 additions & 6 deletions aws_lambda_powertools/utilities/idempotency/idempotency.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
BasePersistenceLayer,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools.utilities.typing import DurableContext, LambdaContext

from aws_lambda_powertools.warnings import PowertoolsUserWarning

Expand All @@ -37,9 +37,9 @@

@lambda_handler_decorator
def idempotent(
handler: Callable[[Any, LambdaContext], Any],
handler: Callable[[Any, LambdaContext | DurableContext], Any],
event: dict[str, Any],
context: LambdaContext,
context: LambdaContext | DurableContext,
persistence_store: BasePersistenceLayer,
config: IdempotencyConfig | None = None,
key_prefix: str | None = None,
Expand All @@ -55,7 +55,7 @@ def idempotent(
event: dict
Lambda's Event
context: dict
Lambda's Context
Lambda's Context or Durable Context
persistence_store: BasePersistenceLayer
Instance of BasePersistenceLayer to store data
config: IdempotencyConfig
Expand Down Expand Up @@ -91,7 +91,17 @@ def handler(event, context):
return handler(event, context, **kwargs)

config = config or IdempotencyConfig()
config.register_lambda_context(context)

if hasattr(context, "state"):
# Extract lambda_context from DurableContext
durable_context = cast("DurableContext", context)
config.register_lambda_context(durable_context.lambda_context)
# Note: state.operations is accessed via duck typing at runtime
is_replay = len(durable_context.state.operations) > 1 # type: ignore[attr-defined]
else:
# Standard LambdaContext
config.register_lambda_context(context)
is_replay = False

args = event, context
idempotency_handler = IdempotencyHandler(
Expand All @@ -104,7 +114,7 @@ def handler(event, context):
function_kwargs=kwargs,
)

return idempotency_handler.handle()
return idempotency_handler.handle(is_replay=is_replay)


def idempotent_function(
Expand Down
4 changes: 2 additions & 2 deletions aws_lambda_powertools/utilities/typing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
[`Typing`](../utilities/typing.md)
"""

from .lambda_context import LambdaContext
from .lambda_context import DurableContext, LambdaContext

__all__ = ["LambdaContext"]
__all__ = ["DurableContext", "LambdaContext"]
13 changes: 13 additions & 0 deletions aws_lambda_powertools/utilities/typing/lambda_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,16 @@ def tenant_id(self) -> str | None:
def get_remaining_time_in_millis() -> int:
"""Returns the number of milliseconds left before the execution times out."""
return 0


class DurableContext:
_lambda_context: LambdaContext
_state: object

@property
def lambda_context(self) -> LambdaContext:
return self._lambda_context

@property
def state(self) -> object:
return self._state
187 changes: 187 additions & 0 deletions tests/functional/idempotency/_boto3/test_idempotency.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from aws_lambda_powertools.utilities.idempotency.serialization.dataclass import (
DataclassSerializer,
)
from aws_lambda_powertools.utilities.typing import DurableContext
from aws_lambda_powertools.utilities.validation import envelopes, validator
from aws_lambda_powertools.warnings import PowertoolsUserWarning
from tests.functional.idempotency.utils import (
Expand Down Expand Up @@ -2136,3 +2137,189 @@ def lambda_handler(record, context):
result = lambda_handler(mock_event, lambda_context)
# THEN we expect the function to execute successfully
assert result == expected_result


# Tests: Durable Functions Integration


@pytest.fixture
def durable_context_single_operation(lambda_context):
"""DurableContext with single operation (execution mode, is_replay=False)"""
durable_ctx = DurableContext()
durable_ctx._lambda_context = lambda_context
durable_ctx._state = Mock(operations=[{"id": "op1"}])
return durable_ctx


@pytest.fixture
def durable_context_multiple_operations(lambda_context):
"""DurableContext with multiple operations (replay mode, is_replay=True)"""
durable_ctx = DurableContext()
durable_ctx._lambda_context = lambda_context
durable_ctx._state = Mock(operations=[{"id": "op1"}, {"id": "op2"}])
return durable_ctx


@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}], indirect=True)
def test_idempotent_lambda_with_durable_context_first_execution(
idempotency_config: IdempotencyConfig,
persistence_store: DynamoDBPersistenceLayer,
lambda_apigw_event,
durable_context_single_operation,
lambda_response,
):
"""
Test idempotent decorator with DurableContext during first execution (execution mode).

When a durable function executes for the first time (single operation in state),
is_replay=False, and the function should execute normally, saving the result.
"""
# GIVEN
stubber = stub.Stubber(persistence_store.client)
stubber.add_response("put_item", {})
stubber.add_response("update_item", {})
stubber.activate()

# WHEN
@idempotent(config=idempotency_config, persistence_store=persistence_store)
def lambda_handler(event, context):
return lambda_response

result = lambda_handler(lambda_apigw_event, durable_context_single_operation)

# THEN
assert result == lambda_response
stubber.assert_no_pending_responses()
stubber.deactivate()


@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}], indirect=True)
def test_idempotent_lambda_with_durable_context_during_replay(
idempotency_config: IdempotencyConfig,
persistence_store: DynamoDBPersistenceLayer,
lambda_apigw_event,
durable_context_multiple_operations,
timestamp_future,
lambda_response,
serialized_lambda_response,
):
"""
Test idempotent decorator with DurableContext during workflow replay (replay mode).

When a durable function replays (multiple operations in state), is_replay=True.
The function should execute once to get the response and save it, even when
an INPROGRESS record exists from a previous execution.
"""
# GIVEN
hashed_key = hash_idempotency_key(data=lambda_apigw_event)

stubber = stub.Stubber(persistence_store.client)
ddb_response = {
"Item": {
"id": {"S": hashed_key},
"expiration": {"N": timestamp_future},
"data": {"S": serialized_lambda_response},
"status": {"S": "INPROGRESS"},
},
}
stubber.add_client_error("put_item", "ConditionalCheckFailedException", modeled_fields=ddb_response)
# In replay mode, function still executes once to get response, then saves it
stubber.add_response("update_item", {})
stubber.activate()

# WHEN
@idempotent(config=idempotency_config, persistence_store=persistence_store)
def lambda_handler(event, context):
return lambda_response

result = lambda_handler(lambda_apigw_event, durable_context_multiple_operations)

# THEN - Should return result in replay mode
assert result == lambda_response
stubber.assert_no_pending_responses()
stubber.deactivate()


@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}], indirect=True)
def test_idempotent_lambda_extracts_lambda_context_from_durable_context(
idempotency_config: IdempotencyConfig,
persistence_store: DynamoDBPersistenceLayer,
lambda_apigw_event,
durable_context_single_operation,
lambda_response,
):
"""
Test that idempotency properly extracts LambdaContext from DurableContext.

The @idempotent decorator should extract the wrapped lambda_context from
DurableContext for tracking remaining time and other Lambda-specific features.
"""
# GIVEN
stubber = stub.Stubber(persistence_store.client)
stubber.add_response("put_item", {})
stubber.add_response("update_item", {})
stubber.activate()

# WHEN
@idempotent(config=idempotency_config, persistence_store=persistence_store)
def lambda_handler(event, context):
# Verify we can access lambda_context properties
assert hasattr(context, "lambda_context")
assert context.lambda_context.function_name == "test-func"
return lambda_response

result = lambda_handler(lambda_apigw_event, durable_context_single_operation)

# THEN
assert result == lambda_response
stubber.assert_no_pending_responses()
stubber.deactivate()


@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}], indirect=True)
def test_idempotent_lambda_concurrent_durable_executions_raise_in_progress_error(
idempotency_config: IdempotencyConfig,
persistence_store: DynamoDBPersistenceLayer,
lambda_apigw_event,
durable_context_single_operation,
lambda_response,
):
"""
Test that concurrent durable executions are prevented by IdempotencyAlreadyInProgressError.

Scenario: Two different durable function executions attempt to process the same
idempotent operation concurrently:
1. First execution creates an INPROGRESS record
2. Second execution (in execution mode, is_replay=False) finds the INPROGRESS record
3. Second execution should raise IdempotencyAlreadyInProgressError to prevent duplicate work

This ensures data consistency when multiple durable function instances execute concurrently.
"""
# GIVEN
hashed_key = hash_idempotency_key(data=lambda_apigw_event)

stubber = stub.Stubber(persistence_store.client)
# Simulate existing INPROGRESS record with far future timestamps
ddb_response = {
"Item": {
"id": {"S": hashed_key},
"expiration": {"N": "9999999999"},
"in_progress_expiration": {"N": "9999999999999"}, # Far future in milliseconds
"status": {"S": "INPROGRESS"},
},
}
stubber.add_client_error("put_item", "ConditionalCheckFailedException", modeled_fields=ddb_response)
stubber.activate()

# WHEN / THEN - Should raise IdempotencyAlreadyInProgressError in execution mode
@idempotent(config=idempotency_config, persistence_store=persistence_store)
def lambda_handler(event, context):
return lambda_response

with pytest.raises(IdempotencyAlreadyInProgressError) as exc_info:
lambda_handler(lambda_apigw_event, durable_context_single_operation)

# Verify error message contains the idempotency key
assert hashed_key in str(exc_info.value)
stubber.assert_no_pending_responses()
stubber.deactivate()