Skip to content

Commit 08d3812

Browse files
author
Regan Koopmans
committed
feat: support conversion of BigQuery schema to Protobuf
1 parent 3928ddc commit 08d3812

File tree

3 files changed

+722
-1
lines changed

3 files changed

+722
-1
lines changed

packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
# this code path once we drop support for Python 3.7
2929
import importlib_metadata as metadata
3030

31-
from google.cloud.bigquery_storage_v1 import client, types
31+
from google.cloud.bigquery_storage_v1 import client, schema, types
3232

3333

3434
class BigQueryReadClient(client.BigQueryReadClient):
@@ -140,4 +140,6 @@ def _get_version(dependency_name):
140140
# google.cloud.bigquery_storage_v1.client
141141
"BigQueryReadClient",
142142
"BigQueryWriteClient",
143+
# google.cloud.bigquery_storage_v1.schema
144+
"schema",
143145
)
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Utilities for converting BigQuery schemas to Protocol Buffer descriptors.
16+
17+
This module provides functionality to dynamically generate Protocol Buffer
18+
descriptors from BigQuery table schemas, eliminating the need to manually
19+
create and compile .proto files when using the BigQuery Storage Write API.
20+
"""
21+
22+
from typing import Dict, List, Optional
23+
24+
from google.cloud.bigquery_storage_v1 import types
25+
from google.protobuf import descriptor_pb2
26+
27+
28+
# Mapping from BigQuery types to Protocol Buffer field types
29+
_BQ_TO_PROTO_TYPE_MAP: Dict[types.TableFieldSchema.Type, int] = {
30+
types.TableFieldSchema.Type.STRING: descriptor_pb2.FieldDescriptorProto.TYPE_STRING,
31+
types.TableFieldSchema.Type.INT64: descriptor_pb2.FieldDescriptorProto.TYPE_INT64,
32+
types.TableFieldSchema.Type.BOOL: descriptor_pb2.FieldDescriptorProto.TYPE_BOOL,
33+
types.TableFieldSchema.Type.BYTES: descriptor_pb2.FieldDescriptorProto.TYPE_BYTES,
34+
types.TableFieldSchema.Type.DOUBLE: descriptor_pb2.FieldDescriptorProto.TYPE_DOUBLE,
35+
# DATE is represented as days since epoch
36+
types.TableFieldSchema.Type.DATE: descriptor_pb2.FieldDescriptorProto.TYPE_INT32,
37+
# DATETIME is represented as a formatted string
38+
types.TableFieldSchema.Type.DATETIME: descriptor_pb2.FieldDescriptorProto.TYPE_STRING,
39+
# TIME is represented as a formatted string
40+
types.TableFieldSchema.Type.TIME: descriptor_pb2.FieldDescriptorProto.TYPE_STRING,
41+
# TIMESTAMP is represented as microseconds since epoch
42+
types.TableFieldSchema.Type.TIMESTAMP: descriptor_pb2.FieldDescriptorProto.TYPE_INT64,
43+
# NUMERIC and BIGNUMERIC are represented as strings
44+
types.TableFieldSchema.Type.NUMERIC: descriptor_pb2.FieldDescriptorProto.TYPE_STRING,
45+
types.TableFieldSchema.Type.BIGNUMERIC: descriptor_pb2.FieldDescriptorProto.TYPE_STRING,
46+
# GEOGRAPHY is represented as WKT string
47+
types.TableFieldSchema.Type.GEOGRAPHY: descriptor_pb2.FieldDescriptorProto.TYPE_STRING,
48+
# JSON is represented as a string
49+
types.TableFieldSchema.Type.JSON: descriptor_pb2.FieldDescriptorProto.TYPE_STRING,
50+
# INTERVAL is represented as a string
51+
types.TableFieldSchema.Type.INTERVAL: descriptor_pb2.FieldDescriptorProto.TYPE_STRING,
52+
# STRUCT and RANGE use TYPE_MESSAGE and are handled specially
53+
types.TableFieldSchema.Type.STRUCT: descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE,
54+
types.TableFieldSchema.Type.RANGE: descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE,
55+
}
56+
57+
58+
def _get_field_label(mode: types.TableFieldSchema.Mode) -> int:
59+
"""Convert BigQuery field mode to Protocol Buffer field label.
60+
61+
Args:
62+
mode: The BigQuery field mode (NULLABLE, REQUIRED, or REPEATED).
63+
64+
Returns:
65+
The corresponding Protocol Buffer field label constant.
66+
"""
67+
if mode == types.TableFieldSchema.Mode.REQUIRED:
68+
return descriptor_pb2.FieldDescriptorProto.LABEL_REQUIRED
69+
elif mode == types.TableFieldSchema.Mode.REPEATED:
70+
return descriptor_pb2.FieldDescriptorProto.LABEL_REPEATED
71+
else: # NULLABLE or MODE_UNSPECIFIED
72+
return descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL
73+
74+
75+
def _create_range_descriptor(
76+
field_name: str,
77+
element_type: types.TableFieldSchema.Type,
78+
) -> descriptor_pb2.DescriptorProto:
79+
"""Create a nested message descriptor for a RANGE field.
80+
81+
Args:
82+
field_name: The name of the RANGE field.
83+
element_type: The element type of the RANGE (e.g., DATE, DATETIME, TIMESTAMP).
84+
85+
Returns:
86+
A DescriptorProto representing the RANGE structure with start and end fields.
87+
"""
88+
range_descriptor = descriptor_pb2.DescriptorProto()
89+
range_descriptor.name = f"{field_name}_Range"
90+
91+
# Get the proto type for the element
92+
element_proto_type = _BQ_TO_PROTO_TYPE_MAP.get(element_type)
93+
if element_proto_type is None:
94+
raise ValueError(
95+
f"Unsupported element type '{element_type}' for RANGE field '{field_name}'. "
96+
f"Supported types are DATE, DATETIME, and TIMESTAMP."
97+
)
98+
99+
# Add 'start' field
100+
start_field = range_descriptor.field.add()
101+
start_field.name = "start"
102+
start_field.number = 1
103+
start_field.type = element_proto_type
104+
start_field.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL
105+
106+
# Add 'end' field
107+
end_field = range_descriptor.field.add()
108+
end_field.name = "end"
109+
end_field.number = 2
110+
end_field.type = element_proto_type
111+
end_field.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL
112+
113+
return range_descriptor
114+
115+
116+
def _convert_fields_to_proto(
117+
fields: List[types.TableFieldSchema],
118+
) -> tuple[List[descriptor_pb2.FieldDescriptorProto], List[descriptor_pb2.DescriptorProto]]:
119+
"""Convert BigQuery fields to Protocol Buffer field descriptors.
120+
121+
Args:
122+
fields: List of BigQuery table field schemas.
123+
124+
Returns:
125+
A tuple of (field_descriptors, nested_descriptors):
126+
- field_descriptors: List of FieldDescriptorProto objects
127+
- nested_descriptors: List of nested DescriptorProto objects for STRUCT/RANGE types
128+
"""
129+
field_descriptors = []
130+
nested_descriptors = []
131+
132+
for field_number, bq_field in enumerate(fields, start=1):
133+
field_descriptor = descriptor_pb2.FieldDescriptorProto()
134+
field_descriptor.name = bq_field.name
135+
field_descriptor.number = field_number
136+
137+
# Handle STRUCT fields (nested messages)
138+
if bq_field.type_ == types.TableFieldSchema.Type.STRUCT:
139+
field_descriptor.type = descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE
140+
141+
# Create nested message descriptor
142+
nested_descriptor = descriptor_pb2.DescriptorProto()
143+
nested_descriptor.name = f"{bq_field.name}_Struct"
144+
145+
# Recursively convert nested fields
146+
nested_fields, deeply_nested = _convert_fields_to_proto(
147+
bq_field.fields
148+
)
149+
150+
for nested_field in nested_fields:
151+
nested_descriptor.field.append(nested_field)
152+
153+
for deeply_nested_type in deeply_nested:
154+
nested_descriptor.nested_type.append(deeply_nested_type)
155+
156+
nested_descriptors.append(nested_descriptor)
157+
field_descriptor.type_name = nested_descriptor.name
158+
159+
# Handle RANGE fields
160+
elif bq_field.type_ == types.TableFieldSchema.Type.RANGE:
161+
field_descriptor.type = descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE
162+
163+
# Get the element type from range_element_type
164+
if not bq_field.range_element_type or not bq_field.range_element_type.type_:
165+
raise ValueError(
166+
f"RANGE field '{bq_field.name}' is missing range_element_type. "
167+
f"RANGE fields must specify an element type (DATE, DATETIME, or TIMESTAMP)."
168+
)
169+
element_type = bq_field.range_element_type.type_
170+
171+
range_descriptor = _create_range_descriptor(bq_field.name, element_type)
172+
nested_descriptors.append(range_descriptor)
173+
field_descriptor.type_name = range_descriptor.name
174+
175+
# Handle primitive types
176+
else:
177+
proto_type = _BQ_TO_PROTO_TYPE_MAP.get(bq_field.type_)
178+
if proto_type is None:
179+
raise ValueError(
180+
f"Unsupported BigQuery type: {bq_field.type_} for field {bq_field.name}"
181+
)
182+
field_descriptor.type = proto_type
183+
184+
# Set field label based on mode
185+
mode = bq_field.mode or types.TableFieldSchema.Mode.NULLABLE
186+
field_descriptor.label = _get_field_label(mode)
187+
188+
field_descriptors.append(field_descriptor)
189+
190+
return field_descriptors, nested_descriptors
191+
192+
193+
def table_schema_to_proto_descriptor(
194+
table_schema: types.TableSchema,
195+
message_name: str = "TableRow",
196+
) -> descriptor_pb2.DescriptorProto:
197+
"""Convert a BigQuery TableSchema to a Protocol Buffer DescriptorProto.
198+
199+
This function generates a Protocol Buffer descriptor that can be used with
200+
the BigQuery Storage Write API without needing to create and compile .proto
201+
files. The generated descriptor uses proto2 wire format, which is required
202+
by the Write API.
203+
204+
Args:
205+
table_schema: The BigQuery table schema to convert.
206+
message_name: Optional name for the root message type. Defaults to "TableRow".
207+
208+
Returns:
209+
A DescriptorProto that can be used with ProtoSchema in the Write API.
210+
211+
Raises:
212+
ValueError: If the schema contains unsupported field types.
213+
214+
Example:
215+
>>> from google.cloud.bigquery_storage_v1 import schema, types
216+
>>>
217+
>>> # Define a BigQuery schema
218+
>>> table_schema = types.TableSchema(fields=[
219+
... types.TableFieldSchema(
220+
... name="id",
221+
... type_=types.TableFieldSchema.Type.INT64,
222+
... mode=types.TableFieldSchema.Mode.REQUIRED
223+
... ),
224+
... types.TableFieldSchema(
225+
... name="name",
226+
... type_=types.TableFieldSchema.Type.STRING
227+
... ),
228+
... ])
229+
>>>
230+
>>> # Convert to proto descriptor
231+
>>> proto_descriptor = schema.table_schema_to_proto_descriptor(table_schema)
232+
>>>
233+
>>> # Use with Write API
234+
>>> proto_schema = types.ProtoSchema()
235+
>>> proto_schema.proto_descriptor = proto_descriptor
236+
237+
Note:
238+
For detailed information about BigQuery to Protocol Buffer type mappings,
239+
see: https://cloud.google.com/bigquery/docs/write-api#data_type_conversions
240+
"""
241+
# Create the root descriptor
242+
descriptor = descriptor_pb2.DescriptorProto()
243+
descriptor.name = message_name
244+
245+
# Convert fields
246+
field_descriptors, nested_descriptors = _convert_fields_to_proto(table_schema.fields)
247+
248+
# Add field descriptors to the message
249+
for field_descriptor in field_descriptors:
250+
descriptor.field.append(field_descriptor)
251+
252+
# Add nested type descriptors
253+
for nested_descriptor in nested_descriptors:
254+
descriptor.nested_type.append(nested_descriptor)
255+
256+
return descriptor
257+
258+
259+
__all__ = ("table_schema_to_proto_descriptor",)

0 commit comments

Comments
 (0)