Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@

from google.protobuf import message as protobuf_message
from google.protobuf import timestamp_pb2
import proto

import proto # type: ignore

from google.cloud.bigquery_storage_v1.types import arrow, avro, storage, stream

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,21 @@
try:
import fastavro
except ImportError: # pragma: NO COVER
fastavro = None
fastavro = None # type: ignore
import google.api_core.exceptions
import google.rpc.error_details_pb2
import google.rpc.error_details_pb2 # type: ignore

try:
import pandas
except ImportError: # pragma: NO COVER
pandas = None
try:
import pyarrow
except ImportError: # pragma: NO COVER
pyarrow = None
pandas = None # type: ignore

try:
import pyarrow
# TODO(https://github.com/apache/arrow/issues/32609):
# Remove `type: ignore` once this bug is fixed
import pyarrow # type: ignore
except ImportError: # pragma: NO COVER
pyarrow = None
pyarrow = None # type: ignore


_STREAM_RESUMPTION_EXCEPTIONS = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import queue
import threading
import time
from typing import Callable, Optional, Sequence, Tuple
from typing import Callable, List, Optional, Sequence, Tuple, Union

from google.api_core import bidi, exceptions
from google.api_core.future import polling as polling_future
import google.api_core.retry
import grpc
import grpc # type: ignore

from google.cloud.bigquery_storage_v1 import exceptions as bqstorage_exceptions
from google.cloud.bigquery_storage_v1 import gapic_version as package_version
Expand All @@ -44,7 +44,7 @@
_DEFAULT_TIMEOUT = 600


def _wrap_as_exception(maybe_exception) -> Exception:
def _wrap_as_exception(maybe_exception) -> BaseException:
"""Wrap an object as a Python exception, if needed.
Args:
maybe_exception (Any): The object to wrap, usually a gRPC exception class.
Expand Down Expand Up @@ -109,12 +109,12 @@ def __init__(
"""
self._client = client
self._closed = False
self._close_callbacks = []
self._close_callbacks: List[Callable] = []
self._metadata = metadata
self._thread_lock = threading.RLock()
self._closed_connection = {}
self._closed_connection: Union[_Connection | None] = None

self._stream_name = None
self._stream_name: str = ""

# Make a deepcopy of the template and clear the proto3-only fields
self._initial_request_template = _process_request_template(
Expand Down Expand Up @@ -156,7 +156,7 @@ def send(self, request: gapic_types.AppendRowsRequest) -> AppendRowsFuture:
A future, which can be used to process the response when it
arrives.
"""
if self._stream_name is None:
if not self._stream_name:
self._stream_name = request.write_stream
elif request.write_stream != self._stream_name:
raise ValueError(
Expand Down Expand Up @@ -211,7 +211,7 @@ def _renew_connection(self, reason: Optional[Exception] = None) -> None:
# critical section, this step is not guaranteed to be atomic.
_closed_connection._shutdown(reason=reason)

def _on_rpc_done(self, reason: Optional[Exception] = None) -> None:
def _on_rpc_done(self, reason: Optional[BaseException] = None) -> None:
"""Callback passecd to _Connection. It's called when the RPC connection
is closed without recovery. Spins up a new thread to call the helper
function `_renew_connection()`, which creates a new connection and
Expand Down Expand Up @@ -254,10 +254,10 @@ def __init__(
self._metadata = metadata
self._thread_lock = threading.RLock()

self._rpc = None
self._consumer = None
self._stream_name = None
self._queue = queue.Queue()
self._rpc: Union[bidi.BidiRpc | None] = None
self._consumer: Union[bidi.BackgroundConsumer | None] = None
self._stream_name: str = ""
self._queue: queue.Queue[AppendRowsFuture] = queue.Queue()

# statuses
self._closed = False
Expand Down Expand Up @@ -429,7 +429,8 @@ def send(self, request: gapic_types.AppendRowsRequest) -> AppendRowsFuture:
# pull it off and notify completion.
future = AppendRowsFuture(self._writer)
self._queue.put(future)
self._rpc.send(request)
if self._rpc is not None:
self._rpc.send(request)
return future

def _shutdown(self, reason: Optional[Exception] = None) -> None:
Expand All @@ -447,7 +448,8 @@ def _shutdown(self, reason: Optional[Exception] = None) -> None:
# Stop consuming messages.
if self.is_active:
_LOGGER.debug("Stopping consumer.")
self._consumer.stop()
if self._consumer is not None:
self._consumer.stop()
self._consumer = None

if self._rpc is not None:
Expand All @@ -462,6 +464,7 @@ def _shutdown(self, reason: Optional[Exception] = None) -> None:
# stopped (or at least is attempting to stop), we won't get
# response callbacks to populate the remaining futures.
future = self._queue.get_nowait()
exc: Union[Exception, bqstorage_exceptions.StreamClosedError]
if reason is None:
exc = bqstorage_exceptions.StreamClosedError(
"Stream closed before receiving a response."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import queue
import threading
import time
from typing import Callable, Optional, Sequence, Tuple
from typing import Callable, List, Optional, Sequence, Tuple, Union

from google.api_core import bidi, exceptions
from google.api_core.future import polling as polling_future
import google.api_core.retry
import grpc
import grpc # type: ignore

from google.cloud.bigquery_storage_v1beta2 import exceptions as bqstorage_exceptions
from google.cloud.bigquery_storage_v1beta2 import types as gapic_types
Expand All @@ -43,7 +43,7 @@
_DEFAULT_TIMEOUT = 600


def _wrap_as_exception(maybe_exception) -> Exception:
def _wrap_as_exception(maybe_exception) -> Union[BaseException]:
"""Wrap an object as a Python exception, if needed.
Args:
maybe_exception (Any): The object to wrap, usually a gRPC exception class.
Expand Down Expand Up @@ -85,19 +85,19 @@ def __init__(
self._client = client
self._closing = threading.Lock()
self._closed = False
self._close_callbacks = []
self._futures_queue = queue.Queue()
self._close_callbacks: List[Callable] = []
self._futures_queue: queue.Queue[AppendRowsFuture] = queue.Queue()
self._inital_request_template = initial_request_template
self._metadata = metadata

# Only one call to `send()` should attempt to open the RPC.
self._opening = threading.Lock()

self._rpc = None
self._stream_name = None
self._rpc: Union[bidi.BidiRpc | None] = None
self._stream_name: str = ""

# The threads created in ``._open()``.
self._consumer = None
self._consumer: Union[bidi.BackgroundConsumer | None] = None

@property
def is_active(self) -> bool:
Expand Down Expand Up @@ -251,7 +251,8 @@ def send(self, request: gapic_types.AppendRowsRequest) -> "AppendRowsFuture":
# pull it off and notify completion.
future = AppendRowsFuture(self)
self._futures_queue.put(future)
self._rpc.send(request)
if self._rpc is not None:
self._rpc.send(request)
return future

def _on_response(self, response: gapic_types.AppendRowsResponse):
Expand Down Expand Up @@ -302,7 +303,8 @@ def _shutdown(self, reason: Optional[Exception] = None):
# Stop consuming messages.
if self.is_active:
_LOGGER.debug("Stopping consumer.")
self._consumer.stop()
if self._consumer is not None:
self._consumer.stop()
self._consumer = None

if self._rpc is not None:
Expand All @@ -318,6 +320,7 @@ def _shutdown(self, reason: Optional[Exception] = None):
# stopped (or at least is attempting to stop), we won't get
# response callbacks to populate the remaining futures.
future = self._futures_queue.get_nowait()
exc: Union[Exception, bqstorage_exceptions.StreamClosedError]
if reason is None:
exc = bqstorage_exceptions.StreamClosedError(
"Stream closed before receiving a response."
Expand Down
3 changes: 2 additions & 1 deletion packages/google-cloud-bigquery-storage/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ def mypy(session):
"mypy<1.16.0",
"types-requests",
"types-protobuf",
"pandas-stubs",
)
session.install(".")
session.install(".[fastavro]")
session.run(
"mypy",
"-p",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_ctor_defaults(self):
assert stream._closed is False
assert not stream._close_callbacks
assert stream._metadata == ()
assert stream._stream_name is None
assert stream._stream_name == ""
assert isinstance(stream._thread_lock, type(threading.RLock()))

assert isinstance(stream._connection, _Connection)
Expand Down Expand Up @@ -229,7 +229,7 @@ def test_ctor_defaults(self):
assert isinstance(connection._thread_lock, type(threading.RLock()))
assert connection._rpc is None
assert connection._consumer is None
assert connection._stream_name is None
assert connection._stream_name == ""
assert isinstance(connection._queue, queue.Queue)

assert connection._closed is False
Expand Down
Loading