diff --git a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/gapic_types.py b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/gapic_types.py index fd37531ab6b1..4aa4d2fdf5f2 100644 --- a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/gapic_types.py +++ b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/gapic_types.py @@ -22,7 +22,8 @@ from google.protobuf import message as protobuf_message from google.protobuf import timestamp_pb2 -import proto + +import proto # type: ignore from google.cloud.bigquery_storage_v1.types import arrow, avro, storage, stream diff --git a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py index 0b7170abc564..f6f52fbfd6ae 100644 --- a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py +++ b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py @@ -22,23 +22,21 @@ try: import fastavro except ImportError: # pragma: NO COVER - fastavro = None + fastavro = None # type: ignore import google.api_core.exceptions -import google.rpc.error_details_pb2 +import google.rpc.error_details_pb2 # type: ignore try: import pandas except ImportError: # pragma: NO COVER - pandas = None -try: - import pyarrow -except ImportError: # pragma: NO COVER - pyarrow = None + pandas = None # type: ignore try: - import pyarrow + # TODO(https://github.com/apache/arrow/issues/32609): + # Remove `type: ignore` once this bug is fixed + import pyarrow # type: ignore except ImportError: # pragma: NO COVER - pyarrow = None + pyarrow = None # type: ignore _STREAM_RESUMPTION_EXCEPTIONS = ( diff --git a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/writer.py b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/writer.py index ea132a02fdef..6604270df1cd 100644 --- a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/writer.py +++ b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/writer.py @@ -20,12 +20,12 @@ import queue import threading import time -from typing import Callable, Optional, Sequence, Tuple +from typing import Callable, List, Optional, Sequence, Tuple, Union from google.api_core import bidi, exceptions from google.api_core.future import polling as polling_future import google.api_core.retry -import grpc +import grpc # type: ignore from google.cloud.bigquery_storage_v1 import exceptions as bqstorage_exceptions from google.cloud.bigquery_storage_v1 import gapic_version as package_version @@ -44,7 +44,7 @@ _DEFAULT_TIMEOUT = 600 -def _wrap_as_exception(maybe_exception) -> Exception: +def _wrap_as_exception(maybe_exception) -> BaseException: """Wrap an object as a Python exception, if needed. Args: maybe_exception (Any): The object to wrap, usually a gRPC exception class. @@ -109,12 +109,12 @@ def __init__( """ self._client = client self._closed = False - self._close_callbacks = [] + self._close_callbacks: List[Callable] = [] self._metadata = metadata self._thread_lock = threading.RLock() - self._closed_connection = {} + self._closed_connection: Union[_Connection | None] = None - self._stream_name = None + self._stream_name: str = "" # Make a deepcopy of the template and clear the proto3-only fields self._initial_request_template = _process_request_template( @@ -156,7 +156,7 @@ def send(self, request: gapic_types.AppendRowsRequest) -> AppendRowsFuture: A future, which can be used to process the response when it arrives. """ - if self._stream_name is None: + if not self._stream_name: self._stream_name = request.write_stream elif request.write_stream != self._stream_name: raise ValueError( @@ -211,7 +211,7 @@ def _renew_connection(self, reason: Optional[Exception] = None) -> None: # critical section, this step is not guaranteed to be atomic. _closed_connection._shutdown(reason=reason) - def _on_rpc_done(self, reason: Optional[Exception] = None) -> None: + def _on_rpc_done(self, reason: Optional[BaseException] = None) -> None: """Callback passecd to _Connection. It's called when the RPC connection is closed without recovery. Spins up a new thread to call the helper function `_renew_connection()`, which creates a new connection and @@ -254,10 +254,10 @@ def __init__( self._metadata = metadata self._thread_lock = threading.RLock() - self._rpc = None - self._consumer = None - self._stream_name = None - self._queue = queue.Queue() + self._rpc: Union[bidi.BidiRpc | None] = None + self._consumer: Union[bidi.BackgroundConsumer | None] = None + self._stream_name: str = "" + self._queue: queue.Queue[AppendRowsFuture] = queue.Queue() # statuses self._closed = False @@ -429,7 +429,8 @@ def send(self, request: gapic_types.AppendRowsRequest) -> AppendRowsFuture: # pull it off and notify completion. future = AppendRowsFuture(self._writer) self._queue.put(future) - self._rpc.send(request) + if self._rpc is not None: + self._rpc.send(request) return future def _shutdown(self, reason: Optional[Exception] = None) -> None: @@ -447,7 +448,8 @@ def _shutdown(self, reason: Optional[Exception] = None) -> None: # Stop consuming messages. if self.is_active: _LOGGER.debug("Stopping consumer.") - self._consumer.stop() + if self._consumer is not None: + self._consumer.stop() self._consumer = None if self._rpc is not None: @@ -462,6 +464,7 @@ def _shutdown(self, reason: Optional[Exception] = None) -> None: # stopped (or at least is attempting to stop), we won't get # response callbacks to populate the remaining futures. future = self._queue.get_nowait() + exc: Union[Exception, bqstorage_exceptions.StreamClosedError] if reason is None: exc = bqstorage_exceptions.StreamClosedError( "Stream closed before receiving a response." diff --git a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1beta2/writer.py b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1beta2/writer.py index 3859aeeff328..6c5ce89858d1 100644 --- a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1beta2/writer.py +++ b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1beta2/writer.py @@ -20,12 +20,12 @@ import queue import threading import time -from typing import Callable, Optional, Sequence, Tuple +from typing import Callable, List, Optional, Sequence, Tuple, Union from google.api_core import bidi, exceptions from google.api_core.future import polling as polling_future import google.api_core.retry -import grpc +import grpc # type: ignore from google.cloud.bigquery_storage_v1beta2 import exceptions as bqstorage_exceptions from google.cloud.bigquery_storage_v1beta2 import types as gapic_types @@ -43,7 +43,7 @@ _DEFAULT_TIMEOUT = 600 -def _wrap_as_exception(maybe_exception) -> Exception: +def _wrap_as_exception(maybe_exception) -> Union[BaseException]: """Wrap an object as a Python exception, if needed. Args: maybe_exception (Any): The object to wrap, usually a gRPC exception class. @@ -85,19 +85,19 @@ def __init__( self._client = client self._closing = threading.Lock() self._closed = False - self._close_callbacks = [] - self._futures_queue = queue.Queue() + self._close_callbacks: List[Callable] = [] + self._futures_queue: queue.Queue[AppendRowsFuture] = queue.Queue() self._inital_request_template = initial_request_template self._metadata = metadata # Only one call to `send()` should attempt to open the RPC. self._opening = threading.Lock() - self._rpc = None - self._stream_name = None + self._rpc: Union[bidi.BidiRpc | None] = None + self._stream_name: str = "" # The threads created in ``._open()``. - self._consumer = None + self._consumer: Union[bidi.BackgroundConsumer | None] = None @property def is_active(self) -> bool: @@ -251,7 +251,8 @@ def send(self, request: gapic_types.AppendRowsRequest) -> "AppendRowsFuture": # pull it off and notify completion. future = AppendRowsFuture(self) self._futures_queue.put(future) - self._rpc.send(request) + if self._rpc is not None: + self._rpc.send(request) return future def _on_response(self, response: gapic_types.AppendRowsResponse): @@ -302,7 +303,8 @@ def _shutdown(self, reason: Optional[Exception] = None): # Stop consuming messages. if self.is_active: _LOGGER.debug("Stopping consumer.") - self._consumer.stop() + if self._consumer is not None: + self._consumer.stop() self._consumer = None if self._rpc is not None: @@ -318,6 +320,7 @@ def _shutdown(self, reason: Optional[Exception] = None): # stopped (or at least is attempting to stop), we won't get # response callbacks to populate the remaining futures. future = self._futures_queue.get_nowait() + exc: Union[Exception, bqstorage_exceptions.StreamClosedError] if reason is None: exc = bqstorage_exceptions.StreamClosedError( "Stream closed before receiving a response." diff --git a/packages/google-cloud-bigquery-storage/noxfile.py b/packages/google-cloud-bigquery-storage/noxfile.py index 25e0b39cd444..16aa63c726ac 100644 --- a/packages/google-cloud-bigquery-storage/noxfile.py +++ b/packages/google-cloud-bigquery-storage/noxfile.py @@ -112,8 +112,9 @@ def mypy(session): "mypy<1.16.0", "types-requests", "types-protobuf", + "pandas-stubs", ) - session.install(".") + session.install(".[fastavro]") session.run( "mypy", "-p", diff --git a/packages/google-cloud-bigquery-storage/tests/unit/test_writer_v1.py b/packages/google-cloud-bigquery-storage/tests/unit/test_writer_v1.py index 88ab7d141fae..06ab1c76da55 100644 --- a/packages/google-cloud-bigquery-storage/tests/unit/test_writer_v1.py +++ b/packages/google-cloud-bigquery-storage/tests/unit/test_writer_v1.py @@ -66,7 +66,7 @@ def test_ctor_defaults(self): assert stream._closed is False assert not stream._close_callbacks assert stream._metadata == () - assert stream._stream_name is None + assert stream._stream_name == "" assert isinstance(stream._thread_lock, type(threading.RLock())) assert isinstance(stream._connection, _Connection) @@ -229,7 +229,7 @@ def test_ctor_defaults(self): assert isinstance(connection._thread_lock, type(threading.RLock())) assert connection._rpc is None assert connection._consumer is None - assert connection._stream_name is None + assert connection._stream_name == "" assert isinstance(connection._queue, queue.Queue) assert connection._closed is False