Skip to content

Commit e3ec02b

Browse files
committed
got tests running
1 parent a1dffb5 commit e3ec02b

File tree

4 files changed

+47
-14
lines changed

4 files changed

+47
-14
lines changed

google/cloud/bigtable/data/_async/_replaceable_channel.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,15 @@ def __call__(self, *args, **kwargs) -> Call:
5555
class WrappedUnaryUnaryMultiCallable(
5656
_WrappedMultiCallable, UnaryUnaryMultiCallable
5757
):
58-
pass
58+
if not CrossSync.is_async:
59+
# add missing functions for sync unary callable
60+
61+
def with_call(self, *args, **kwargs):
62+
call = self.__call__(self, *args, **kwargs)
63+
return call(), call
64+
65+
def future(self, *args, **kwargs):
66+
raise NotImplementedError
5967

6068

6169
class WrappedUnaryStreamMultiCallable(
@@ -114,6 +122,8 @@ def stream_stream(self, *args, **kwargs) -> StreamStreamMultiCallable:
114122
)(*call_args, **call_kwargs)
115123
)
116124

125+
# grace not supported by sync version
126+
@CrossSync.drop
117127
async def close(self, grace=None):
118128
return await self._channel.close(grace=grace)
119129

@@ -138,6 +148,18 @@ async def wait_for_state_change(self, last_observed_state):
138148
def __getattr__(self, name):
139149
return getattr(self._channel, name)
140150

151+
if not CrossSync.is_async:
152+
153+
def close(self):
154+
return self._channel.close()
155+
156+
def subscribe(self, callback, try_to_connect=False):
157+
return self._channel.subscribe(callback, try_to_connect)
158+
159+
def unsubscribe(self, callback):
160+
return self._channel.unsubscribe(callback)
161+
162+
141163
@CrossSync.convert_class(sync_name="_ReplaceableChannel", replace_symbols={"_AsyncWrappedChannel": "_WrappedChannel"})
142164
class _AsyncReplaceableChannel(_AsyncWrappedChannel):
143165

google/cloud/bigtable/data/_async/client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@
103103
from grpc import intercept_channel
104104
from google.cloud.bigtable_v2.services.bigtable.transports import BigtableGrpcTransport as TransportType # type: ignore
105105
from google.cloud.bigtable.data._sync_autogen.mutations_batcher import _MB_SIZE
106-
from google.cloud.bigtable.data._async._replaceable_channel import _ReplaceableChannel
106+
from google.cloud.bigtable.data._sync_autogen._replaceable_channel import _ReplaceableChannel
107107

108108

109109
if TYPE_CHECKING:
@@ -346,7 +346,7 @@ async def _ping_and_warm_instances(
346346
)
347347
return [r or None for r in result_list]
348348

349-
@CrossSync.convert
349+
@CrossSync.convert(replace_symbols={"_AsyncReplaceableChannel": "_ReplaceableChannel"})
350350
async def _manage_channel(
351351
self,
352352
refresh_interval_min: float = 60 * 35,
@@ -397,7 +397,7 @@ async def _manage_channel(
397397
new_channel = super_channel.create_channel()
398398
await self._ping_and_warm_instances(channel=new_channel)
399399
# cycle channel out of use, with long grace window before closure
400-
old_channel = super_channel.replace_wrapped_channel(new_channel, grace_period)
400+
old_channel = super_channel.replace_wrapped_channel(new_channel)
401401
# give old_channel a chance to complete existing rpcs
402402
if CrossSync.is_async:
403403
await old_channel.close(grace_period)

google/cloud/bigtable/data/_sync_autogen/_replaceable_channel.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,12 @@ def __call__(self, *args, **kwargs) -> Call:
4141

4242

4343
class WrappedUnaryUnaryMultiCallable(_WrappedMultiCallable, UnaryUnaryMultiCallable):
44-
pass
44+
def with_call(self, *args, **kwargs):
45+
call = self.__call__(self, *args, **kwargs)
46+
return (call(), call)
47+
48+
def future(self, *args, **kwargs):
49+
raise NotImplementedError
4550

4651

4752
class WrappedUnaryStreamMultiCallable(_WrappedMultiCallable, UnaryStreamMultiCallable):
@@ -95,9 +100,6 @@ def stream_stream(self, *args, **kwargs) -> StreamStreamMultiCallable:
95100
)(*call_args, **call_kwargs)
96101
)
97102

98-
def close(self, grace=None):
99-
return self._channel.close(grace=grace)
100-
101103
def channel_ready(self):
102104
return self._channel.channel_ready()
103105

@@ -117,6 +119,15 @@ def wait_for_state_change(self, last_observed_state):
117119
def __getattr__(self, name):
118120
return getattr(self._channel, name)
119121

122+
def close(self):
123+
return self._channel.close()
124+
125+
def subscribe(self, callback, try_to_connect=False):
126+
return self._channel.subscribe(callback, try_to_connect)
127+
128+
def unsubscribe(self, callback):
129+
return self._channel.unsubscribe(callback)
130+
120131

121132
class _ReplaceableChannel(_WrappedChannel):
122133
def __init__(self, channel_fn: Callable[[], Channel]):

google/cloud/bigtable/data/_sync_autogen/client.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@
7979
BigtableGrpcTransport as TransportType,
8080
)
8181
from google.cloud.bigtable.data._sync_autogen.mutations_batcher import _MB_SIZE
82-
from google.cloud.bigtable.data._async._replaceable_channel import _ReplaceableChannel
82+
from google.cloud.bigtable.data._sync_autogen._replaceable_channel import (
83+
_ReplaceableChannel,
84+
)
8385

8486
if TYPE_CHECKING:
8587
from google.cloud.bigtable.data._helpers import RowKeySamples
@@ -282,10 +284,10 @@ def _manage_channel(
282284
between `refresh_interval_min` and `refresh_interval_max`
283285
grace_period: time to allow previous channel to serve existing
284286
requests before closing, in seconds"""
285-
if not isinstance(self.transport.grpc_channel, _AsyncReplaceableChannel):
287+
if not isinstance(self.transport.grpc_channel, _ReplaceableChannel):
286288
warnings.warn("Channel does not support auto-refresh.")
287289
return
288-
super_channel: _AsyncReplaceableChannel = self.transport.grpc_channel
290+
super_channel: _ReplaceableChannel = self.transport.grpc_channel
289291
first_refresh = self._channel_init_time + random.uniform(
290292
refresh_interval_min, refresh_interval_max
291293
)
@@ -301,9 +303,7 @@ def _manage_channel(
301303
start_timestamp = time.monotonic()
302304
new_channel = super_channel.create_channel()
303305
self._ping_and_warm_instances(channel=new_channel)
304-
old_channel = super_channel.replace_wrapped_channel(
305-
new_channel, grace_period
306-
)
306+
old_channel = super_channel.replace_wrapped_channel(new_channel)
307307
if grace_period:
308308
self._is_closed.wait(grace_period)
309309
old_channel.close()

0 commit comments

Comments
 (0)