Skip to content

Commit f2536ae

Browse files
authored
fix: fix mypy (#15531)
Towards #15104 This PR addresses the following mypy failure for `google-cloud-bigquery-storage`: ``` (py3142) partheniou@partheniou-vm-3:~/git/google-cloud-python/packages/google-cloud-bigquery-storage$ nox -s mypy-3.14 nox > Running session mypy-3.14 nox > Creating virtual environment (virtualenv) using python3.14 in .nox/mypy-3-14 nox > python -m pip install 'mypy<1.16.0' types-requests types-protobuf nox > python -m pip install . nox > mypy -p google .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/gapic_types.py:25: error: Skipping analyzing "proto": module is installed, but missing library stubs or py.typed marker [import-untyped] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/gapic_types.py:25: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1beta/__init__.py:87: note: By default the bodies of untyped functions are not checked, consider using --check-untyped-defs [annotation-unchecked] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1alpha/__init__.py:87: note: By default the bodies of untyped functions are not checked, consider using --check-untyped-defs [annotation-unchecked] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/reader.py:23: error: Cannot find implementation or library stub for module named "fastavro" [import-not-found] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/reader.py:27: error: Skipping analyzing "google.rpc.error_details_pb2": module is installed, but missing library stubs or py.typed marker [import-untyped] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/reader.py:27: error: Skipping analyzing "google.rpc": module is installed, but missing library stubs or py.typed marker [import-untyped] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/reader.py:30: error: Library stubs not installed for "pandas" [import-untyped] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/reader.py:30: note: Hint: "python3 -m pip install pandas-stubs" .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/reader.py:30: note: (or run "mypy --install-types" to install all missing stub packages) .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/reader.py:34: error: Cannot find implementation or library stub for module named "pyarrow" [import-not-found] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/__init__.py:98: note: By default the bodies of untyped functions are not checked, consider using --check-untyped-defs [annotation-unchecked] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1beta2/__init__.py:98: note: By default the bodies of untyped functions are not checked, consider using --check-untyped-defs [annotation-unchecked] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/writer.py:28: error: Skipping analyzing "grpc": module is installed, but missing library stubs or py.typed marker [import-untyped] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/writer.py:58: error: Incompatible return value type (got "BaseException", expected "Exception") [return-value] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/writer.py:112: error: Need type annotation for "_close_callbacks" (hint: "_close_callbacks: list[<type>] = ...") [var-annotated] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/writer.py:115: error: Need type annotation for "_closed_connection" (hint: "_closed_connection: dict[<type>, <type>] = ...") [var-annotated] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/writer.py:160: error: Incompatible types in assignment (expression has type "str", variable has type "None") [assignment] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/writer.py:260: error: Need type annotation for "_queue" [var-annotated] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/writer.py:319: error: Incompatible types in assignment (expression has type "BidiRpc", variable has type "None") [assignment] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/writer.py:333: error: "None" has no attribute "add_done_callback" [attr-defined] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/writer.py:335: error: Incompatible types in assignment (expression has type "BackgroundConsumer", variable has type "None") [assignment] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/writer.py:336: error: "None" has no attribute "start" [attr-defined] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/writer.py:345: error: "None" has no attribute "is_active" [attr-defined] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/writer.py:364: error: "None" has no attribute "is_active" [attr-defined] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/writer.py:395: error: Incompatible types in assignment (expression has type "str", variable has type "None") [assignment] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/writer.py:432: error: "None" has no attribute "send" [attr-defined] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/writer.py:450: error: "None" has no attribute "stop" [attr-defined] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1/writer.py:470: error: Incompatible types in assignment (expression has type "Exception", variable has type "StreamClosedError") [assignment] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1beta2/writer.py:28: error: Skipping analyzing "grpc": module is installed, but missing library stubs or py.typed marker [import-untyped] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1beta2/writer.py:57: error: Incompatible return value type (got "BaseException", expected "Exception") [return-value] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1beta2/writer.py:88: error: Need type annotation for "_close_callbacks" (hint: "_close_callbacks: list[<type>] = ...") [var-annotated] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1beta2/writer.py:89: error: Need type annotation for "_futures_queue" [var-annotated] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1beta2/writer.py:154: error: Incompatible types in assignment (expression has type "str", variable has type "None") [assignment] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1beta2/writer.py:159: error: Incompatible types in assignment (expression has type "BidiRpc", variable has type "None") [assignment] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1beta2/writer.py:173: error: "None" has no attribute "add_done_callback" [attr-defined] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1beta2/writer.py:175: error: Incompatible types in assignment (expression has type "BackgroundConsumer", variable has type "None") [assignment] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1beta2/writer.py:176: error: "None" has no attribute "start" [attr-defined] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1beta2/writer.py:185: error: "None" has no attribute "is_active" [attr-defined] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1beta2/writer.py:204: error: "None" has no attribute "is_active" [attr-defined] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1beta2/writer.py:254: error: "None" has no attribute "send" [attr-defined] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1beta2/writer.py:305: error: "None" has no attribute "stop" [attr-defined] .nox/mypy-3-14/lib/python3.14/site-packages/google/cloud/bigquery_storage_v1beta2/writer.py:326: error: Incompatible types in assignment (expression has type "Exception", variable has type "StreamClosedError") [assignment] Found 36 errors in 4 files (checked 87 source files) nox > Command mypy -p google failed with exit code 1 nox > Session mypy-3.14 failed. ```
1 parent c5ee986 commit f2536ae

File tree

6 files changed

+43
-37
lines changed

6 files changed

+43
-37
lines changed

packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/gapic_types.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222

2323
from google.protobuf import message as protobuf_message
2424
from google.protobuf import timestamp_pb2
25-
import proto
25+
26+
import proto # type: ignore
2627

2728
from google.cloud.bigquery_storage_v1.types import arrow, avro, storage, stream
2829

packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,21 @@
2222
try:
2323
import fastavro
2424
except ImportError: # pragma: NO COVER
25-
fastavro = None
25+
fastavro = None # type: ignore
2626
import google.api_core.exceptions
27-
import google.rpc.error_details_pb2
27+
import google.rpc.error_details_pb2 # type: ignore
2828

2929
try:
3030
import pandas
3131
except ImportError: # pragma: NO COVER
32-
pandas = None
33-
try:
34-
import pyarrow
35-
except ImportError: # pragma: NO COVER
36-
pyarrow = None
32+
pandas = None # type: ignore
3733

3834
try:
39-
import pyarrow
35+
# TODO(https://github.com/apache/arrow/issues/32609):
36+
# Remove `type: ignore` once this bug is fixed
37+
import pyarrow # type: ignore
4038
except ImportError: # pragma: NO COVER
41-
pyarrow = None
39+
pyarrow = None # type: ignore
4240

4341

4442
_STREAM_RESUMPTION_EXCEPTIONS = (

packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/writer.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
import queue
2121
import threading
2222
import time
23-
from typing import Callable, Optional, Sequence, Tuple
23+
from typing import Callable, List, Optional, Sequence, Tuple, Union
2424

2525
from google.api_core import bidi, exceptions
2626
from google.api_core.future import polling as polling_future
2727
import google.api_core.retry
28-
import grpc
28+
import grpc # type: ignore
2929

3030
from google.cloud.bigquery_storage_v1 import exceptions as bqstorage_exceptions
3131
from google.cloud.bigquery_storage_v1 import gapic_version as package_version
@@ -44,7 +44,7 @@
4444
_DEFAULT_TIMEOUT = 600
4545

4646

47-
def _wrap_as_exception(maybe_exception) -> Exception:
47+
def _wrap_as_exception(maybe_exception) -> BaseException:
4848
"""Wrap an object as a Python exception, if needed.
4949
Args:
5050
maybe_exception (Any): The object to wrap, usually a gRPC exception class.
@@ -109,12 +109,12 @@ def __init__(
109109
"""
110110
self._client = client
111111
self._closed = False
112-
self._close_callbacks = []
112+
self._close_callbacks: List[Callable] = []
113113
self._metadata = metadata
114114
self._thread_lock = threading.RLock()
115-
self._closed_connection = {}
115+
self._closed_connection: Union[_Connection | None] = None
116116

117-
self._stream_name = None
117+
self._stream_name: str = ""
118118

119119
# Make a deepcopy of the template and clear the proto3-only fields
120120
self._initial_request_template = _process_request_template(
@@ -156,7 +156,7 @@ def send(self, request: gapic_types.AppendRowsRequest) -> AppendRowsFuture:
156156
A future, which can be used to process the response when it
157157
arrives.
158158
"""
159-
if self._stream_name is None:
159+
if not self._stream_name:
160160
self._stream_name = request.write_stream
161161
elif request.write_stream != self._stream_name:
162162
raise ValueError(
@@ -211,7 +211,7 @@ def _renew_connection(self, reason: Optional[Exception] = None) -> None:
211211
# critical section, this step is not guaranteed to be atomic.
212212
_closed_connection._shutdown(reason=reason)
213213

214-
def _on_rpc_done(self, reason: Optional[Exception] = None) -> None:
214+
def _on_rpc_done(self, reason: Optional[BaseException] = None) -> None:
215215
"""Callback passecd to _Connection. It's called when the RPC connection
216216
is closed without recovery. Spins up a new thread to call the helper
217217
function `_renew_connection()`, which creates a new connection and
@@ -254,10 +254,10 @@ def __init__(
254254
self._metadata = metadata
255255
self._thread_lock = threading.RLock()
256256

257-
self._rpc = None
258-
self._consumer = None
259-
self._stream_name = None
260-
self._queue = queue.Queue()
257+
self._rpc: Union[bidi.BidiRpc | None] = None
258+
self._consumer: Union[bidi.BackgroundConsumer | None] = None
259+
self._stream_name: str = ""
260+
self._queue: queue.Queue[AppendRowsFuture] = queue.Queue()
261261

262262
# statuses
263263
self._closed = False
@@ -429,7 +429,8 @@ def send(self, request: gapic_types.AppendRowsRequest) -> AppendRowsFuture:
429429
# pull it off and notify completion.
430430
future = AppendRowsFuture(self._writer)
431431
self._queue.put(future)
432-
self._rpc.send(request)
432+
if self._rpc is not None:
433+
self._rpc.send(request)
433434
return future
434435

435436
def _shutdown(self, reason: Optional[Exception] = None) -> None:
@@ -447,7 +448,8 @@ def _shutdown(self, reason: Optional[Exception] = None) -> None:
447448
# Stop consuming messages.
448449
if self.is_active:
449450
_LOGGER.debug("Stopping consumer.")
450-
self._consumer.stop()
451+
if self._consumer is not None:
452+
self._consumer.stop()
451453
self._consumer = None
452454

453455
if self._rpc is not None:
@@ -462,6 +464,7 @@ def _shutdown(self, reason: Optional[Exception] = None) -> None:
462464
# stopped (or at least is attempting to stop), we won't get
463465
# response callbacks to populate the remaining futures.
464466
future = self._queue.get_nowait()
467+
exc: Union[Exception, bqstorage_exceptions.StreamClosedError]
465468
if reason is None:
466469
exc = bqstorage_exceptions.StreamClosedError(
467470
"Stream closed before receiving a response."

packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1beta2/writer.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
import queue
2121
import threading
2222
import time
23-
from typing import Callable, Optional, Sequence, Tuple
23+
from typing import Callable, List, Optional, Sequence, Tuple, Union
2424

2525
from google.api_core import bidi, exceptions
2626
from google.api_core.future import polling as polling_future
2727
import google.api_core.retry
28-
import grpc
28+
import grpc # type: ignore
2929

3030
from google.cloud.bigquery_storage_v1beta2 import exceptions as bqstorage_exceptions
3131
from google.cloud.bigquery_storage_v1beta2 import types as gapic_types
@@ -43,7 +43,7 @@
4343
_DEFAULT_TIMEOUT = 600
4444

4545

46-
def _wrap_as_exception(maybe_exception) -> Exception:
46+
def _wrap_as_exception(maybe_exception) -> Union[BaseException]:
4747
"""Wrap an object as a Python exception, if needed.
4848
Args:
4949
maybe_exception (Any): The object to wrap, usually a gRPC exception class.
@@ -85,19 +85,19 @@ def __init__(
8585
self._client = client
8686
self._closing = threading.Lock()
8787
self._closed = False
88-
self._close_callbacks = []
89-
self._futures_queue = queue.Queue()
88+
self._close_callbacks: List[Callable] = []
89+
self._futures_queue: queue.Queue[AppendRowsFuture] = queue.Queue()
9090
self._inital_request_template = initial_request_template
9191
self._metadata = metadata
9292

9393
# Only one call to `send()` should attempt to open the RPC.
9494
self._opening = threading.Lock()
9595

96-
self._rpc = None
97-
self._stream_name = None
96+
self._rpc: Union[bidi.BidiRpc | None] = None
97+
self._stream_name: str = ""
9898

9999
# The threads created in ``._open()``.
100-
self._consumer = None
100+
self._consumer: Union[bidi.BackgroundConsumer | None] = None
101101

102102
@property
103103
def is_active(self) -> bool:
@@ -251,7 +251,8 @@ def send(self, request: gapic_types.AppendRowsRequest) -> "AppendRowsFuture":
251251
# pull it off and notify completion.
252252
future = AppendRowsFuture(self)
253253
self._futures_queue.put(future)
254-
self._rpc.send(request)
254+
if self._rpc is not None:
255+
self._rpc.send(request)
255256
return future
256257

257258
def _on_response(self, response: gapic_types.AppendRowsResponse):
@@ -302,7 +303,8 @@ def _shutdown(self, reason: Optional[Exception] = None):
302303
# Stop consuming messages.
303304
if self.is_active:
304305
_LOGGER.debug("Stopping consumer.")
305-
self._consumer.stop()
306+
if self._consumer is not None:
307+
self._consumer.stop()
306308
self._consumer = None
307309

308310
if self._rpc is not None:
@@ -318,6 +320,7 @@ def _shutdown(self, reason: Optional[Exception] = None):
318320
# stopped (or at least is attempting to stop), we won't get
319321
# response callbacks to populate the remaining futures.
320322
future = self._futures_queue.get_nowait()
323+
exc: Union[Exception, bqstorage_exceptions.StreamClosedError]
321324
if reason is None:
322325
exc = bqstorage_exceptions.StreamClosedError(
323326
"Stream closed before receiving a response."

packages/google-cloud-bigquery-storage/noxfile.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,9 @@ def mypy(session):
112112
"mypy<1.16.0",
113113
"types-requests",
114114
"types-protobuf",
115+
"pandas-stubs",
115116
)
116-
session.install(".")
117+
session.install(".[fastavro]")
117118
session.run(
118119
"mypy",
119120
"-p",

packages/google-cloud-bigquery-storage/tests/unit/test_writer_v1.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def test_ctor_defaults(self):
6666
assert stream._closed is False
6767
assert not stream._close_callbacks
6868
assert stream._metadata == ()
69-
assert stream._stream_name is None
69+
assert stream._stream_name == ""
7070
assert isinstance(stream._thread_lock, type(threading.RLock()))
7171

7272
assert isinstance(stream._connection, _Connection)
@@ -229,7 +229,7 @@ def test_ctor_defaults(self):
229229
assert isinstance(connection._thread_lock, type(threading.RLock()))
230230
assert connection._rpc is None
231231
assert connection._consumer is None
232-
assert connection._stream_name is None
232+
assert connection._stream_name == ""
233233
assert isinstance(connection._queue, queue.Queue)
234234

235235
assert connection._closed is False

0 commit comments

Comments
 (0)