Skip to content

Commit 115fed2

Browse files
committed
Reverted data client changes and fixed mutate_rows implementation
1 parent 9d83a7b commit 115fed2

File tree

5 files changed

+135
-141
lines changed

5 files changed

+135
-141
lines changed

google/cloud/bigtable/data/_async/client.py

Lines changed: 11 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1556,33 +1556,6 @@ async def mutate_row(
15561556
exception_factory=_retry_exception_factory,
15571557
)
15581558

1559-
@CrossSync.convert
1560-
def _get_mutate_rows_operation(
1561-
self,
1562-
mutation_entries: list[RowMutationEntry],
1563-
*,
1564-
operation_timeout: float | TABLE_DEFAULT,
1565-
attempt_timeout: float | None | TABLE_DEFAULT,
1566-
retryable_errors: Sequence[type[Exception]] | TABLE_DEFAULT,
1567-
) -> CrossSync._MutateRowsOperation:
1568-
"""
1569-
Gets the bulk mutate rows operation object for the given mutation entries.
1570-
"""
1571-
operation_timeout, attempt_timeout = _get_timeouts(
1572-
operation_timeout, attempt_timeout, self
1573-
)
1574-
retryable_excs = _get_retryable_errors(retryable_errors, self)
1575-
1576-
operation = CrossSync._MutateRowsOperation(
1577-
self.client._gapic_client,
1578-
self,
1579-
mutation_entries,
1580-
operation_timeout,
1581-
attempt_timeout,
1582-
retryable_exceptions=retryable_excs,
1583-
)
1584-
return operation
1585-
15861559
@CrossSync.convert
15871560
async def bulk_mutate_rows(
15881561
self,
@@ -1624,11 +1597,18 @@ async def bulk_mutate_rows(
16241597
Contains details about any failed entries in .exceptions
16251598
ValueError: if invalid arguments are provided
16261599
"""
1627-
operation = self._get_mutate_rows_operation(
1600+
operation_timeout, attempt_timeout = _get_timeouts(
1601+
operation_timeout, attempt_timeout, self
1602+
)
1603+
retryable_excs = _get_retryable_errors(retryable_errors, self)
1604+
1605+
operation = CrossSync._MutateRowsOperation(
1606+
self.client._gapic_client,
1607+
self,
16281608
mutation_entries,
1629-
operation_timeout=operation_timeout,
1630-
attempt_timeout=attempt_timeout,
1631-
retryable_errors=retryable_errors,
1609+
operation_timeout,
1610+
attempt_timeout,
1611+
retryable_exceptions=retryable_excs,
16321612
)
16331613
await operation.start()
16341614

google/cloud/bigtable/data/_sync_autogen/client.py

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1299,29 +1299,6 @@ def mutate_row(
12991299
exception_factory=_retry_exception_factory,
13001300
)
13011301

1302-
def _get_mutate_rows_operation(
1303-
self,
1304-
mutation_entries: list[RowMutationEntry],
1305-
*,
1306-
operation_timeout: float | TABLE_DEFAULT,
1307-
attempt_timeout: float | None | TABLE_DEFAULT,
1308-
retryable_errors: Sequence[type[Exception]] | TABLE_DEFAULT,
1309-
) -> CrossSync._Sync_Impl._MutateRowsOperation:
1310-
"""Gets the bulk mutate rows operation object for the given mutation entries."""
1311-
(operation_timeout, attempt_timeout) = _get_timeouts(
1312-
operation_timeout, attempt_timeout, self
1313-
)
1314-
retryable_excs = _get_retryable_errors(retryable_errors, self)
1315-
operation = CrossSync._Sync_Impl._MutateRowsOperation(
1316-
self.client._gapic_client,
1317-
self,
1318-
mutation_entries,
1319-
operation_timeout,
1320-
attempt_timeout,
1321-
retryable_exceptions=retryable_excs,
1322-
)
1323-
return operation
1324-
13251302
def bulk_mutate_rows(
13261303
self,
13271304
mutation_entries: list[RowMutationEntry],
@@ -1360,11 +1337,17 @@ def bulk_mutate_rows(
13601337
MutationsExceptionGroup: if one or more mutations fails
13611338
Contains details about any failed entries in .exceptions
13621339
ValueError: if invalid arguments are provided"""
1363-
operation = self._get_mutate_rows_operation(
1340+
(operation_timeout, attempt_timeout) = _get_timeouts(
1341+
operation_timeout, attempt_timeout, self
1342+
)
1343+
retryable_excs = _get_retryable_errors(retryable_errors, self)
1344+
operation = CrossSync._Sync_Impl._MutateRowsOperation(
1345+
self.client._gapic_client,
1346+
self,
13641347
mutation_entries,
1365-
operation_timeout=operation_timeout,
1366-
attempt_timeout=attempt_timeout,
1367-
retryable_errors=retryable_errors,
1348+
operation_timeout,
1349+
attempt_timeout,
1350+
retryable_exceptions=retryable_excs,
13681351
)
13691352
operation.start()
13701353

google/cloud/bigtable/row.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ def _set_cell(self, column_family_id, column, value, timestamp=None, state=None)
162162
:meth:`_get_mutations`.
163163
"""
164164
if timestamp is None or timestamp == mutations._SERVER_SIDE_TIMESTAMP:
165+
# Preserve special-case values (client side timestamp generation or server side timestamp)
165166
timestamp_micros = timestamp
166167
else:
167168
timestamp_micros = _microseconds_from_datetime(timestamp)

google/cloud/bigtable/table.py

Lines changed: 54 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@
3131
from google.cloud.bigtable.column_family import _gc_rule_from_pb
3232
from google.cloud.bigtable.column_family import ColumnFamily
3333
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT
34-
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup
34+
from google.cloud.bigtable.data.exceptions import (
35+
RetryExceptionGroup,
36+
MutationsExceptionGroup,
37+
)
3538
from google.cloud.bigtable.data.mutations import RowMutationEntry
3639
from google.cloud.bigtable.batcher import MutationsBatcher
3740
from google.cloud.bigtable.batcher import FLUSH_COUNT, MAX_MUTATION_SIZE
@@ -740,18 +743,20 @@ def mutate_rows(self, rows, retry=DEFAULT_RETRY, timeout=DEFAULT):
740743
if timeout is DEFAULT:
741744
timeout = self.mutation_timeout
742745

743-
# To adhere to the retry strategy of do-nothing being achievable with a deadline
744-
# of 0.0, we modify the retryable errors to be empty if such a deadline is passed.
745746
retryable_errors = RETRYABLE_MUTATION_ERRORS
746-
operation_timeout = retry.deadline
747747

748748
# The data client cannot take in zero or null values for deadline, so we set it to
749-
# the default if that is the case. It shouldn't affect the behavior of the retry
750-
# if a 0.0 deadline is set.
751-
if not retry.deadline:
749+
# the default if that is the case.
750+
if retry.deadline is None:
752751
operation_timeout = TABLE_DEFAULT.MUTATE_ROWS
753-
if retry.deadline == 0.0:
754-
retryable_errors = []
752+
753+
# To adhere to the retry strategy of do-nothing being achievable with a deadline
754+
# of 0.0, we modify the retryable errors to be empty if such a deadline is passed.
755+
elif retry.deadline == 0:
756+
operation_timeout = TABLE_DEFAULT.MUTATE_ROWS
757+
retryable_errors = []
758+
else:
759+
operation_timeout = retry.deadline
755760

756761
attempt_timeout = timeout
757762
mutation_entries = [
@@ -761,40 +766,50 @@ def mutate_rows(self, rows, retry=DEFAULT_RETRY, timeout=DEFAULT):
761766
mutation_entries
762767
) # By default, return status OKs for everything
763768

764-
operation = self._table_impl._get_mutate_rows_operation(
765-
mutation_entries,
766-
operation_timeout=operation_timeout,
767-
attempt_timeout=attempt_timeout,
768-
retryable_errors=retryable_errors,
769-
)
770-
771769
try:
772-
operation.start()
773-
except MutationsExceptionGroup:
774-
# Take the first exception for each error index with a gRPC status code
775-
# and set the status of that row entry to that grpc status. if none of the
776-
# errors for a given index have gRPC status codes, return an UNKNOWN status
777-
# with the first error message of each index.
778-
for idx, errors in operation.errors.items():
779-
return_statuses[idx] = status_pb2.Status(
780-
code=code_pb2.Code.UNKNOWN,
781-
message=str(errors[0]),
782-
)
783-
784-
for error in errors:
785-
if (
786-
isinstance(error, GoogleAPICallError)
787-
and error.grpc_status_code is not None
788-
):
789-
return_statuses[idx] = status_pb2.Status(
790-
code=error.grpc_status_code.value[0],
791-
message=error.message,
792-
details=error.details,
793-
)
794-
break
770+
self._table_impl.bulk_mutate_rows(
771+
mutation_entries,
772+
operation_timeout=operation_timeout,
773+
attempt_timeout=attempt_timeout,
774+
retryable_errors=retryable_errors,
775+
)
776+
except MutationsExceptionGroup as mut_exc_group:
777+
# We exception handle as follows:
778+
#
779+
# 1. Each exception in the error group is a FailedMutationEntryError, and its
780+
# cause is either a singular exception or a RetryExceptionGroup consisting of
781+
# multiple exceptions.
782+
#
783+
# 2. In the case of a singular exception, if the error does not have a gRPC status
784+
# code, we return a status code of UNKNOWN.
785+
#
786+
# 3. In the case of a RetryExceptionGroup, we use terminal exception in the exception
787+
# group and process that.
788+
for error in mut_exc_group.exceptions:
789+
cause = error.__cause__
790+
if isinstance(cause, RetryExceptionGroup):
791+
return_statuses[error.index] = self._get_status(
792+
cause.exceptions[-1]
793+
)
794+
else:
795+
return_statuses[error.index] = self._get_status(cause)
795796

796797
return return_statuses
797798

799+
@staticmethod
800+
def _get_status(error):
801+
if isinstance(error, GoogleAPICallError) and error.grpc_status_code is not None:
802+
return status_pb2.Status(
803+
code=error.grpc_status_code.value[0],
804+
message=error.message,
805+
details=error.details,
806+
)
807+
808+
return status_pb2.Status(
809+
code=code_pb2.Code.UNKNOWN,
810+
message=str(error),
811+
)
812+
798813
def sample_row_keys(self):
799814
"""Read a sample of row keys in the table.
800815

0 commit comments

Comments
 (0)