Skip to content

Commit 58f6c91

Browse files
chore(samples): add sample for AddToCell mutation (#1285)
Replaces googleapis/python-bigtable#1231 Fixes b/452032333 Adds a sample for the new AddToCell incrementation mutation --------- Co-authored-by: Kasia Strzałkowska <strzalkowska@google.com>
1 parent 257b874 commit 58f6c91

File tree

3 files changed

+82
-6
lines changed

3 files changed

+82
-6
lines changed

packages/google-cloud-bigtable/samples/snippets/data_client/data_client_snippets_async.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,47 @@ async def write_conditional(project_id, instance_id, table_id):
136136
await write_conditional(table.client.project, table.instance_id, table.table_id)
137137

138138

139+
async def write_aggregate(table):
140+
# [START bigtable_async_write_aggregate]
141+
import time
142+
from google.cloud.bigtable.data import BigtableDataClientAsync
143+
from google.cloud.bigtable.data.mutations import AddToCell, RowMutationEntry
144+
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup
145+
146+
async def write_aggregate(project_id, instance_id, table_id):
147+
"""Increments a value in a Bigtable table using AddToCell mutation."""
148+
async with BigtableDataClientAsync(project=project_id) as client:
149+
table = client.get_table(instance_id, table_id)
150+
row_key = "unique_device_ids_1"
151+
try:
152+
async with table.mutations_batcher() as batcher:
153+
# The AddToCell mutation increments the value of a cell.
154+
# The `counters` family must be set up to be an aggregate
155+
# family with an int64 input type.
156+
reading = AddToCell(
157+
family="counters",
158+
qualifier="odometer",
159+
value=32304,
160+
# Convert nanoseconds to microseconds
161+
timestamp_micros=time.time_ns() // 1000,
162+
)
163+
await batcher.append(
164+
RowMutationEntry(row_key.encode("utf-8"), [reading])
165+
)
166+
except MutationsExceptionGroup as e:
167+
# MutationsExceptionGroup contains a FailedMutationEntryError for
168+
# each mutation that failed.
169+
for sub_exception in e.exceptions:
170+
failed_entry: RowMutationEntry = sub_exception.entry
171+
cause: Exception = sub_exception.__cause__
172+
print(
173+
f"Failed mutation for row {failed_entry.row_key!r} with error: {cause!r}"
174+
)
175+
176+
# [END bigtable_async_write_aggregate]
177+
await write_aggregate(table.client.project, table.instance_id, table.table_id)
178+
179+
139180
async def read_row(table):
140181
# [START bigtable_async_reads_row]
141182
from google.cloud.bigtable.data import BigtableDataClientAsync

packages/google-cloud-bigtable/samples/snippets/data_client/data_client_snippets_async_test.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,26 @@
2525

2626

2727
@pytest.fixture(scope="session")
28-
def table_id():
29-
with create_table_cm(PROJECT, BIGTABLE_INSTANCE, TABLE_ID, {"family": None, "stats_summary": None}):
28+
def column_family_config():
29+
from google.cloud.bigtable_admin_v2 import types
30+
31+
int_aggregate_type = types.Type.Aggregate(
32+
input_type=types.Type(int64_type={"encoding": {"big_endian_bytes": {}}}),
33+
sum={},
34+
)
35+
36+
return {
37+
"family": types.ColumnFamily(),
38+
"stats_summary": types.ColumnFamily(),
39+
"counters": types.ColumnFamily(
40+
value_type=types.Type(aggregate_type=int_aggregate_type)
41+
),
42+
}
43+
44+
45+
@pytest.fixture(scope="session")
46+
def table_id(column_family_config):
47+
with create_table_cm(PROJECT, BIGTABLE_INSTANCE, TABLE_ID, column_family_config):
3048
yield TABLE_ID
3149

3250

@@ -59,6 +77,11 @@ async def test_write_conditional(table):
5977
await data_snippets.write_conditional(table)
6078

6179

80+
@pytest.mark.asyncio
81+
async def test_write_aggregate(table):
82+
await data_snippets.write_aggregate(table)
83+
84+
6285
@pytest.mark.asyncio
6386
async def test_read_row(table):
6487
await data_snippets.read_row(table)

packages/google-cloud-bigtable/samples/utils.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717

1818
from google.cloud import bigtable
19+
from google.cloud.bigtable.column_family import ColumnFamily
20+
from google.cloud.bigtable_admin_v2.types import ColumnFamily as ColumnFamily_pb
1921
from google.api_core import exceptions
2022
from google.api_core.retry import Retry
2123
from google.api_core.retry import if_exception_type
@@ -59,10 +61,20 @@ def create_table(project, instance_id, table_id, column_families={}):
5961
if table.exists():
6062
table.delete()
6163

62-
kwargs = {}
63-
if column_families:
64-
kwargs["column_families"] = column_families
65-
table.create(**kwargs)
64+
# convert column families to pb if needed
65+
pb_families = {
66+
id: ColumnFamily(id, table, rule).to_pb() if not isinstance(rule, ColumnFamily_pb) else rule
67+
for (id, rule) in column_families.items()
68+
}
69+
70+
# create table using gapic layer
71+
instance._client.table_admin_client.create_table(
72+
request={
73+
"parent": instance.name,
74+
"table_id": table_id,
75+
"table": {"column_families": pb_families},
76+
}
77+
)
6678

6779
wait_for_table(table)
6880

0 commit comments

Comments
 (0)