Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit af49a62

Browse files
authored
feat: Rerouted CheckAndMutateRows and ReadModifyWriteRows (#1257)
**Changes made:** - `Row` objects hold `Mutation` and `ReadModifyWriteRowRule` objects from the data client rather than raw protos. - Rerouted `ConditionalRow.commit` and `AppendRow.commit` (CheckAndMutateRows and ReadModifyWriteRows respectively) to use the data client, or more specifically, `self._table._table_impl` - Added function `DirectRow._to_mutation_pbs` for retrieving mutations in proto form for the current `MutateRows` implementation, as well as for `DirectRow.get_mutations_size`. - Removed unnecessary helper functions and tests for helper functions
1 parent f5d3152 commit af49a62

5 files changed

Lines changed: 172 additions & 301 deletions

File tree

google/cloud/bigtable/row.py

Lines changed: 56 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,13 @@
1414

1515
"""User-friendly container for Google Cloud Bigtable Row."""
1616

17-
18-
import struct
19-
2017
from google.cloud._helpers import _datetime_from_microseconds # type: ignore
2118
from google.cloud._helpers import _microseconds_from_datetime # type: ignore
2219
from google.cloud._helpers import _to_bytes # type: ignore
23-
from google.cloud.bigtable_v2.types import data as data_v2_pb2
2420

21+
from google.cloud.bigtable.data import mutations
22+
from google.cloud.bigtable.data import read_modify_write_rules as rmw_rules
2523

26-
_PACK_I64 = struct.Struct(">q").pack
2724

2825
MAX_MUTATIONS = 100000
2926
"""The maximum number of mutations that a row can accumulate."""
@@ -157,26 +154,21 @@ def _set_cell(self, column_family_id, column, value, timestamp=None, state=None)
157154
:param state: (Optional) The state that is passed along to
158155
:meth:`_get_mutations`.
159156
"""
160-
column = _to_bytes(column)
161-
if isinstance(value, int):
162-
value = _PACK_I64(value)
163-
value = _to_bytes(value)
164157
if timestamp is None:
165-
# Use -1 for current Bigtable server time.
166-
timestamp_micros = -1
158+
# Use current Bigtable server time.
159+
timestamp_micros = mutations._SERVER_SIDE_TIMESTAMP
167160
else:
168161
timestamp_micros = _microseconds_from_datetime(timestamp)
169162
# Truncate to millisecond granularity.
170163
timestamp_micros -= timestamp_micros % 1000
171164

172-
mutation_val = data_v2_pb2.Mutation.SetCell(
173-
family_name=column_family_id,
174-
column_qualifier=column,
165+
mutation = mutations.SetCell(
166+
family=column_family_id,
167+
qualifier=column,
168+
new_value=value,
175169
timestamp_micros=timestamp_micros,
176-
value=value,
177170
)
178-
mutation_pb = data_v2_pb2.Mutation(set_cell=mutation_val)
179-
self._get_mutations(state).append(mutation_pb)
171+
self._get_mutations(state).append(mutation)
180172

181173
def _delete(self, state=None):
182174
"""Helper for :meth:`delete`
@@ -191,9 +183,7 @@ def _delete(self, state=None):
191183
:param state: (Optional) The state that is passed along to
192184
:meth:`_get_mutations`.
193185
"""
194-
mutation_val = data_v2_pb2.Mutation.DeleteFromRow()
195-
mutation_pb = data_v2_pb2.Mutation(delete_from_row=mutation_val)
196-
self._get_mutations(state).append(mutation_pb)
186+
self._get_mutations(state).append(mutations.DeleteAllFromRow())
197187

198188
def _delete_cells(self, column_family_id, columns, time_range=None, state=None):
199189
"""Helper for :meth:`delete_cell` and :meth:`delete_cells`.
@@ -220,33 +210,30 @@ def _delete_cells(self, column_family_id, columns, time_range=None, state=None):
220210
:param state: (Optional) The state that is passed along to
221211
:meth:`_get_mutations`.
222212
"""
223-
mutations_list = self._get_mutations(state)
224213
if columns is self.ALL_COLUMNS:
225-
mutation_val = data_v2_pb2.Mutation.DeleteFromFamily(
226-
family_name=column_family_id
214+
self._get_mutations(state).append(
215+
mutations.DeleteAllFromFamily(family_to_delete=column_family_id)
227216
)
228-
mutation_pb = data_v2_pb2.Mutation(delete_from_family=mutation_val)
229-
mutations_list.append(mutation_pb)
230217
else:
231-
delete_kwargs = {}
232-
if time_range is not None:
233-
delete_kwargs["time_range"] = time_range._to_pb()
218+
timestamps = time_range._to_dict() if time_range else {}
219+
start_timestamp_micros = timestamps.get("start_timestamp_micros")
220+
end_timestamp_micros = timestamps.get("end_timestamp_micros")
234221

235222
to_append = []
236223
for column in columns:
237224
column = _to_bytes(column)
238-
# time_range will never change if present, but the rest of
239-
# delete_kwargs will
240-
delete_kwargs.update(
241-
family_name=column_family_id, column_qualifier=column
225+
to_append.append(
226+
mutations.DeleteRangeFromColumn(
227+
family=column_family_id,
228+
qualifier=column,
229+
start_timestamp_micros=start_timestamp_micros,
230+
end_timestamp_micros=end_timestamp_micros,
231+
)
242232
)
243-
mutation_val = data_v2_pb2.Mutation.DeleteFromColumn(**delete_kwargs)
244-
mutation_pb = data_v2_pb2.Mutation(delete_from_column=mutation_val)
245-
to_append.append(mutation_pb)
246233

247234
# We don't add the mutations until all columns have been
248235
# processed without error.
249-
mutations_list.extend(to_append)
236+
self._get_mutations(state).extend(to_append)
250237

251238

252239
class DirectRow(_SetDeleteRow):
@@ -284,7 +271,7 @@ class DirectRow(_SetDeleteRow):
284271

285272
def __init__(self, row_key, table=None):
286273
super(DirectRow, self).__init__(row_key, table)
287-
self._pb_mutations = []
274+
self._mutations = []
288275

289276
def _get_mutations(self, state=None): # pylint: disable=unused-argument
290277
"""Gets the list of mutations for a given state.
@@ -299,7 +286,12 @@ def _get_mutations(self, state=None): # pylint: disable=unused-argument
299286
:rtype: list
300287
:returns: The list to add new mutations to (for the current state).
301288
"""
302-
return self._pb_mutations
289+
return self._mutations
290+
291+
def _get_mutation_pbs(self):
292+
"""Gets the list of mutation protos."""
293+
294+
return [mut._to_pb() for mut in self._get_mutations()]
303295

304296
def get_mutations_size(self):
305297
"""Gets the total mutations size for current row
@@ -313,7 +305,7 @@ def get_mutations_size(self):
313305
"""
314306

315307
mutation_size = 0
316-
for mutation in self._get_mutations():
308+
for mutation in self._get_mutation_pbs():
317309
mutation_size += mutation._pb.ByteSize()
318310

319311
return mutation_size
@@ -486,7 +478,7 @@ def clear(self):
486478
:end-before: [END bigtable_api_row_clear]
487479
:dedent: 4
488480
"""
489-
del self._pb_mutations[:]
481+
del self._mutations[:]
490482

491483

492484
class ConditionalRow(_SetDeleteRow):
@@ -597,17 +589,15 @@ def commit(self):
597589
% (MAX_MUTATIONS, num_true_mutations, num_false_mutations)
598590
)
599591

600-
data_client = self._table._instance._client.table_data_client
601-
resp = data_client.check_and_mutate_row(
602-
table_name=self._table.name,
592+
table = self._table._table_impl
593+
resp = table.check_and_mutate_row(
603594
row_key=self._row_key,
604-
predicate_filter=self._filter._to_pb(),
605-
app_profile_id=self._table._app_profile_id,
606-
true_mutations=true_mutations,
607-
false_mutations=false_mutations,
595+
predicate=self._filter,
596+
true_case_mutations=true_mutations,
597+
false_case_mutations=false_mutations,
608598
)
609599
self.clear()
610-
return resp.predicate_matched
600+
return resp
611601

612602
# pylint: disable=arguments-differ
613603
def set_cell(self, column_family_id, column, value, timestamp=None, state=True):
@@ -797,7 +787,7 @@ class AppendRow(Row):
797787

798788
def __init__(self, row_key, table):
799789
super(AppendRow, self).__init__(row_key, table)
800-
self._rule_pb_list = []
790+
self._rule_list = []
801791

802792
def clear(self):
803793
"""Removes all currently accumulated modifications on current row.
@@ -809,7 +799,7 @@ def clear(self):
809799
:end-before: [END bigtable_api_row_clear]
810800
:dedent: 4
811801
"""
812-
del self._rule_pb_list[:]
802+
del self._rule_list[:]
813803

814804
def append_cell_value(self, column_family_id, column, value):
815805
"""Appends a value to an existing cell.
@@ -842,12 +832,11 @@ def append_cell_value(self, column_family_id, column, value):
842832
the targeted cell is unset, it will be treated as
843833
containing the empty string.
844834
"""
845-
column = _to_bytes(column)
846-
value = _to_bytes(value)
847-
rule_pb = data_v2_pb2.ReadModifyWriteRule(
848-
family_name=column_family_id, column_qualifier=column, append_value=value
835+
self._rule_list.append(
836+
rmw_rules.AppendValueRule(
837+
family=column_family_id, qualifier=column, append_value=value
838+
)
849839
)
850-
self._rule_pb_list.append(rule_pb)
851840

852841
def increment_cell_value(self, column_family_id, column, int_value):
853842
"""Increments a value in an existing cell.
@@ -886,13 +875,11 @@ def increment_cell_value(self, column_family_id, column, int_value):
886875
big-endian signed integer), or the entire request
887876
will fail.
888877
"""
889-
column = _to_bytes(column)
890-
rule_pb = data_v2_pb2.ReadModifyWriteRule(
891-
family_name=column_family_id,
892-
column_qualifier=column,
893-
increment_amount=int_value,
878+
self._rule_list.append(
879+
rmw_rules.IncrementRule(
880+
family=column_family_id, qualifier=column, increment_amount=int_value
881+
)
894882
)
895-
self._rule_pb_list.append(rule_pb)
896883

897884
def commit(self):
898885
"""Makes a ``ReadModifyWriteRow`` API request.
@@ -925,7 +912,7 @@ def commit(self):
925912
:raises: :class:`ValueError <exceptions.ValueError>` if the number of
926913
mutations exceeds the :data:`MAX_MUTATIONS`.
927914
"""
928-
num_mutations = len(self._rule_pb_list)
915+
num_mutations = len(self._rule_list)
929916
if num_mutations == 0:
930917
return {}
931918
if num_mutations > MAX_MUTATIONS:
@@ -934,12 +921,10 @@ def commit(self):
934921
"allowable %d." % (num_mutations, MAX_MUTATIONS)
935922
)
936923

937-
data_client = self._table._instance._client.table_data_client
938-
row_response = data_client.read_modify_write_row(
939-
table_name=self._table.name,
924+
table = self._table._table_impl
925+
row_response = table.read_modify_write_row(
940926
row_key=self._row_key,
941-
rules=self._rule_pb_list,
942-
app_profile_id=self._table._app_profile_id,
927+
rules=self._rule_list,
943928
)
944929

945930
# Reset modifications after commit-ing request.
@@ -983,47 +968,13 @@ def _parse_rmw_row_response(row_response):
983968
}
984969
"""
985970
result = {}
986-
for column_family in row_response.row.families:
987-
column_family_id, curr_family = _parse_family_pb(column_family)
988-
result[column_family_id] = curr_family
971+
for cell in row_response.cells:
972+
column_family = result.setdefault(cell.family, {})
973+
column = column_family.setdefault(cell.qualifier, [])
974+
column.append((cell.value, _datetime_from_microseconds(cell.timestamp_micros)))
989975
return result
990976

991977

992-
def _parse_family_pb(family_pb):
993-
"""Parses a Family protobuf into a dictionary.
994-
995-
:type family_pb: :class:`._generated.data_pb2.Family`
996-
:param family_pb: A protobuf
997-
998-
:rtype: tuple
999-
:returns: A string and dictionary. The string is the name of the
1000-
column family and the dictionary has column names (within the
1001-
family) as keys and cell lists as values. Each cell is
1002-
represented with a two-tuple with the value (in bytes) and the
1003-
timestamp for the cell. For example:
1004-
1005-
.. code:: python
1006-
1007-
{
1008-
b'col-name1': [
1009-
(b'cell-val', datetime.datetime(...)),
1010-
(b'cell-val-newer', datetime.datetime(...)),
1011-
],
1012-
b'col-name2': [
1013-
(b'altcol-cell-val', datetime.datetime(...)),
1014-
],
1015-
}
1016-
"""
1017-
result = {}
1018-
for column in family_pb.columns:
1019-
result[column.qualifier] = cells = []
1020-
for cell in column.cells:
1021-
val_pair = (cell.value, _datetime_from_microseconds(cell.timestamp_micros))
1022-
cells.append(val_pair)
1023-
1024-
return family_pb.name, result
1025-
1026-
1027978
class PartialRowData(object):
1028979
"""Representation of partial row in a Google Cloud Bigtable Table.
1029980

google/cloud/bigtable/table.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1368,7 +1368,7 @@ def _compile_mutation_entries(table_name, rows):
13681368
for row in rows:
13691369
_check_row_table_name(table_name, row)
13701370
_check_row_type(row)
1371-
mutations = row._get_mutations()
1371+
mutations = row._get_mutation_pbs()
13721372
entries.append(entry_klass(row_key=row.row_key, mutations=mutations))
13731373
mutations_count += len(mutations)
13741374

0 commit comments

Comments
 (0)