diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index 41e4460c30..20ff9f9ac1 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -30,6 +30,7 @@ from google.cloud.spanner_v1._helpers import _retry from google.cloud.spanner_v1._helpers import _check_rst_stream_error from google.api_core.exceptions import InternalServerError +from google.protobuf.duration_pb2 import Duration class _BatchBase(_SessionWrapper): @@ -145,9 +146,14 @@ def _check_state(self): if self.committed is not None: raise ValueError("Batch already committed") - def commit(self, return_commit_stats=False, request_options=None): + def commit(self, max_batching_delay_ms=None, return_commit_stats=False, request_options=None): """Commit mutations to the database. + :type max_batching_delay_ms: Optional[int] + :param max_batching_delay_ms: + If present, the The amount of latency this request is willing to incur in order to + improve throughput. + :type return_commit_stats: bool :param return_commit_stats: If true, the response will return commit stats which can be accessed though commit_stats. @@ -182,11 +188,16 @@ def commit(self, return_commit_stats=False, request_options=None): # Request tags are not supported for commit requests. request_options.request_tag = None + Duration max_batching_delay = None + if max_batching_delay_ms is not None: + max_batching_delay.nanos = 1000000 * [max_batching_delay_ms] + request = CommitRequest( session=self._session.name, mutations=self._mutations, single_use_transaction=txn_options, return_commit_stats=return_commit_stats, + max_batching_delay=max_batching_delay, request_options=request_options, ) with trace_call("CloudSpanner.Commit", self._session, trace_attributes): diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index d564d0d488..c4909ddc1b 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -36,6 +36,7 @@ from google.cloud.spanner_v1 import RequestOptions from google.api_core import gapic_v1 from google.api_core.exceptions import InternalServerError +from google.protobuf.duration_pb2 import Duration class Transaction(_SnapshotBase, _BatchBase): @@ -180,9 +181,14 @@ def rollback(self): self.rolled_back = True del self._session._transaction - def commit(self, return_commit_stats=False, request_options=None): + def commit(self, max_batching_delay_ms=None, return_commit_stats=False, request_options=None): """Commit mutations to the database. + :type max_batching_delay_ms: Optional[int] + :param max_batching_delay_ms: + If present, the The amount of latency this request is willing to incur in order to + improve throughput. + :type return_commit_stats: bool :param return_commit_stats: If true, the response will return commit stats which can be accessed though commit_stats. @@ -223,11 +229,16 @@ def commit(self, return_commit_stats=False, request_options=None): # Request tags are not supported for commit requests. request_options.request_tag = None + Duration max_batching_delay = None + if max_batching_delay_ms is not None: + max_batching_delay.nanos = 1000000 * [max_batching_delay_ms] + request = CommitRequest( session=self._session.name, mutations=self._mutations, transaction_id=self._transaction_id, return_commit_stats=return_commit_stats, + max_batching_delay=max_batching_delay, request_options=request_options, ) with trace_call("CloudSpanner.Commit", self._session, trace_attributes):