-
Notifications
You must be signed in to change notification settings - Fork 135
feat: make grpc-gcp default enabled #4239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
89df374
3498883
998c8bc
c3ef0a5
75ac207
8349844
ac8db3d
eaeef52
63f0188
0380118
4cb1192
bb11190
044e1f5
893d50b
e6704b8
19eb04b
371790e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| [ | ||
| { | ||
| "name": "com.google.cloud.grpc.proto.ApiConfig", | ||
| "allDeclaredFields": true, | ||
| "allDeclaredMethods": true, | ||
| "allDeclaredConstructors": true | ||
| }, | ||
| { | ||
| "name": "com.google.cloud.grpc.proto.ApiConfig$Builder", | ||
| "allDeclaredFields": true, | ||
| "allDeclaredMethods": true, | ||
| "allDeclaredConstructors": true | ||
| }, | ||
| { | ||
| "name": "com.google.cloud.grpc.proto.ChannelPoolConfig", | ||
| "allDeclaredFields": true, | ||
| "allDeclaredMethods": true, | ||
| "allDeclaredConstructors": true | ||
| }, | ||
| { | ||
| "name": "com.google.cloud.grpc.proto.ChannelPoolConfig$Builder", | ||
| "allDeclaredFields": true, | ||
| "allDeclaredMethods": true, | ||
| "allDeclaredConstructors": true | ||
| }, | ||
| { | ||
| "name": "com.google.cloud.grpc.proto.MethodConfig", | ||
| "allDeclaredFields": true, | ||
| "allDeclaredMethods": true, | ||
| "allDeclaredConstructors": true | ||
| }, | ||
| { | ||
| "name": "com.google.cloud.grpc.proto.MethodConfig$Builder", | ||
| "allDeclaredFields": true, | ||
| "allDeclaredMethods": true, | ||
| "allDeclaredConstructors": true | ||
| }, | ||
| { | ||
| "name": "com.google.cloud.grpc.proto.AffinityConfig", | ||
| "allDeclaredFields": true, | ||
| "allDeclaredMethods": true, | ||
| "allDeclaredConstructors": true | ||
| }, | ||
| { | ||
| "name": "com.google.cloud.grpc.proto.AffinityConfig$Builder", | ||
| "allDeclaredFields": true, | ||
| "allDeclaredMethods": true, | ||
| "allDeclaredConstructors": true | ||
| }, | ||
| { | ||
| "name": "com.google.cloud.grpc.proto.AffinityConfig$Command", | ||
| "allDeclaredFields": true, | ||
| "allDeclaredMethods": true, | ||
| "allDeclaredConstructors": true | ||
| } | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,10 +17,9 @@ | |
| package com.google.cloud.spanner; | ||
|
|
||
| import static com.google.cloud.spanner.DisableDefaultMtlsProvider.disableDefaultMtlsProvider; | ||
| import static io.grpc.Grpc.TRANSPORT_ATTR_REMOTE_ADDR; | ||
| import static java.util.stream.Collectors.toSet; | ||
| import static org.junit.Assert.assertEquals; | ||
| import static org.junit.Assert.assertTrue; | ||
| import static org.junit.Assume.assumeFalse; | ||
|
|
||
| import com.google.cloud.NoCredentials; | ||
| import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; | ||
|
|
@@ -32,7 +31,6 @@ | |
| import com.google.spanner.v1.StructType; | ||
| import com.google.spanner.v1.StructType.Field; | ||
| import com.google.spanner.v1.TypeCode; | ||
| import io.grpc.Attributes; | ||
| import io.grpc.Context; | ||
| import io.grpc.Contexts; | ||
| import io.grpc.Metadata; | ||
|
|
@@ -70,13 +68,9 @@ public class ChannelUsageTest { | |
| @Parameter(0) | ||
| public int numChannels; | ||
|
|
||
| @Parameter(1) | ||
| public boolean enableGcpPool; | ||
|
|
||
| @Parameters(name = "num channels = {0}, enable GCP pool = {1}") | ||
| @Parameters(name = "num channels = {0}") | ||
| public static Collection<Object[]> data() { | ||
| return Arrays.asList( | ||
| new Object[][] {{1, true}, {1, false}, {2, true}, {2, false}, {4, true}, {4, false}}); | ||
| return Arrays.asList(new Object[][] {{1}, {2}, {4}}); | ||
| } | ||
|
|
||
| private static final Statement SELECT1 = Statement.of("SELECT 1 AS COL1"); | ||
|
|
@@ -106,9 +100,9 @@ public static Collection<Object[]> data() { | |
| private static MockSpannerServiceImpl mockSpanner; | ||
| private static Server server; | ||
| private static InetSocketAddress address; | ||
| private static final Set<InetSocketAddress> batchCreateSessionLocalIps = | ||
| ConcurrentHashMap.newKeySet(); | ||
| private static final Set<InetSocketAddress> executeSqlLocalIps = ConcurrentHashMap.newKeySet(); | ||
| // Track channel hints (from X-Goog-Spanner-Request-Id header) per RPC method | ||
| private static final Set<Long> batchCreateSessionChannelHints = ConcurrentHashMap.newKeySet(); | ||
| private static final Set<Long> executeSqlChannelHints = ConcurrentHashMap.newKeySet(); | ||
|
|
||
| private static Level originalLogLevel; | ||
|
|
||
|
|
@@ -123,8 +117,8 @@ public static void startServer() throws Exception { | |
| server = | ||
| NettyServerBuilder.forAddress(address) | ||
| .addService(mockSpanner) | ||
| // Add a server interceptor to register the remote addresses that we are seeing. This | ||
| // indicates how many channels are used client side to communicate with the server. | ||
| // Add a server interceptor to extract channel hints from X-Goog-Spanner-Request-Id | ||
| // header. This verifies that the client uses all configured channels. | ||
| .intercept( | ||
| new ServerInterceptor() { | ||
| @Override | ||
|
|
@@ -138,22 +132,26 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( | |
| headers.get( | ||
| Metadata.Key.of( | ||
| "x-response-encoding", Metadata.ASCII_STRING_MARSHALLER))); | ||
| Attributes attributes = call.getAttributes(); | ||
| @SuppressWarnings({"unchecked", "deprecation"}) | ||
| Attributes.Key<InetSocketAddress> key = | ||
| (Attributes.Key<InetSocketAddress>) | ||
| attributes.keys().stream() | ||
| .filter(k -> k.equals(TRANSPORT_ATTR_REMOTE_ADDR)) | ||
| .findFirst() | ||
| .orElse(null); | ||
| if (key != null) { | ||
| if (call.getMethodDescriptor() | ||
| .equals(SpannerGrpc.getBatchCreateSessionsMethod())) { | ||
| batchCreateSessionLocalIps.add(attributes.get(key)); | ||
| } | ||
| if (call.getMethodDescriptor() | ||
| .equals(SpannerGrpc.getExecuteStreamingSqlMethod())) { | ||
| executeSqlLocalIps.add(attributes.get(key)); | ||
| // Extract channel hint from X-Goog-Spanner-Request-Id header | ||
| String requestId = headers.get(XGoogSpannerRequestId.REQUEST_ID_HEADER_KEY); | ||
| if (requestId != null) { | ||
| // Format: | ||
| // <version>.<randProcessId>.<nthClientId>.<nthChannelId>.<nthRequest>.<attempt> | ||
| String[] parts = requestId.split("\\."); | ||
| if (parts.length >= 4) { | ||
| try { | ||
| long channelHint = Long.parseLong(parts[3]); | ||
| if (call.getMethodDescriptor() | ||
| .equals(SpannerGrpc.getBatchCreateSessionsMethod())) { | ||
|
Comment on lines
+144
to
+145
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should probably be replaced with
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noted, I will update it in separate PR where we are removing session pool |
||
| batchCreateSessionChannelHints.add(channelHint); | ||
| } | ||
| if (call.getMethodDescriptor() | ||
| .equals(SpannerGrpc.getExecuteStreamingSqlMethod())) { | ||
| executeSqlChannelHints.add(channelHint); | ||
| } | ||
| } catch (NumberFormatException e) { | ||
| // Ignore parse errors | ||
| } | ||
| } | ||
| } | ||
| return Contexts.interceptCall(Context.current(), call, headers, next); | ||
|
|
@@ -185,8 +183,8 @@ public static void resetLogging() { | |
| @After | ||
| public void reset() { | ||
| mockSpanner.reset(); | ||
| batchCreateSessionLocalIps.clear(); | ||
| executeSqlLocalIps.clear(); | ||
| batchCreateSessionChannelHints.clear(); | ||
| executeSqlChannelHints.clear(); | ||
| } | ||
|
|
||
| private SpannerOptions createSpannerOptions() { | ||
|
|
@@ -208,34 +206,14 @@ private SpannerOptions createSpannerOptions() { | |
| .build()) | ||
| .setHost("http://" + endpoint) | ||
| .setCredentials(NoCredentials.getInstance()); | ||
| if (enableGcpPool) { | ||
| builder.enableGrpcGcpExtension(); | ||
| } | ||
|
|
||
| return builder.build(); | ||
| } | ||
|
|
||
| @Test | ||
| public void testCreatesNumChannels() { | ||
| try (Spanner spanner = createSpannerOptions().getService()) { | ||
| assumeFalse( | ||
| "GRPC-GCP is currently not supported with multiplexed sessions", | ||
| isMultiplexedSessionsEnabled(spanner) && enableGcpPool); | ||
| DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); | ||
| try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1)) { | ||
| while (resultSet.next()) {} | ||
| } | ||
| } | ||
| assertEquals(numChannels, batchCreateSessionLocalIps.size()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testUsesAllChannels() throws InterruptedException { | ||
| final int multiplier = 2; | ||
| try (Spanner spanner = createSpannerOptions().getService()) { | ||
| assumeFalse( | ||
| "GRPC-GCP is currently not supported with multiplexed sessions", | ||
| isMultiplexedSessionsEnabled(spanner)); | ||
| DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); | ||
| ListeningExecutorService executor = | ||
| MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numChannels * multiplier)); | ||
|
|
@@ -263,13 +241,23 @@ public void testUsesAllChannels() throws InterruptedException { | |
| executor.shutdown(); | ||
| assertTrue(executor.awaitTermination(Duration.ofSeconds(10L))); | ||
| } | ||
| assertEquals(numChannels, executeSqlLocalIps.size()); | ||
| } | ||
|
|
||
| private boolean isMultiplexedSessionsEnabled(Spanner spanner) { | ||
| if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) { | ||
| return false; | ||
| // Bound the channel hints to numChannels (matching gRPC-GCP behavior) and verify | ||
| // that channels are being distributed. The raw channel hints may be unbounded (based on | ||
| // session index), but gRPC-GCP bounds them to the actual number of channels. | ||
| Set<Long> boundedChannelHints = | ||
| executeSqlChannelHints.stream().map(hint -> hint % numChannels).collect(toSet()); | ||
| // Verify that channel distribution is working: | ||
| // - For numChannels=1, exactly 1 channel should be used | ||
| // - For numChannels>1, multiple channels should be used (at least half) | ||
| if (numChannels == 1) { | ||
| assertEquals(1, boundedChannelHints.size()); | ||
| } else { | ||
| assertTrue( | ||
| "Expected at least " | ||
| + (numChannels / 2) | ||
| + " channels to be used, but got " | ||
| + boundedChannelHints.size(), | ||
| boundedChannelHints.size() >= numChannels / 2); | ||
| } | ||
| return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession(); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.