Skip to content

Commit 80217b1

Browse files
committed
feat: include RequestID in requests and errors
- Send a RequestID to Spanner for each request - Make sure that the attempt number of the RequestID is increased if the RPC is retried. - Include the RequestID in every error that is thrown due to an error that is returned by Spanner.
1 parent f9505a9 commit 80217b1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1336
-560
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbortedException.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,16 @@ public class AbortedException extends SpannerException {
3838
/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
3939
AbortedException(
4040
DoNotConstructDirectly token, @Nullable String message, @Nullable Throwable cause) {
41-
this(token, message, cause, null, null);
41+
this(token, message, cause, null);
4242
}
4343

4444
/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
4545
AbortedException(
4646
DoNotConstructDirectly token,
4747
@Nullable String message,
4848
@Nullable Throwable cause,
49-
@Nullable ApiException apiException,
50-
@Nullable XGoogSpannerRequestId reqId) {
51-
super(token, ErrorCode.ABORTED, IS_RETRYABLE, message, cause, apiException, reqId);
49+
@Nullable ApiException apiException) {
50+
super(token, ErrorCode.ABORTED, IS_RETRYABLE, message, cause, apiException);
5251
if (cause instanceof AbortedException) {
5352
this.transactionID = ((AbortedException) cause).getTransactionID();
5453
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -457,30 +457,22 @@ void initTransaction() {
457457
}
458458

459459
private void initTransactionInternal(BeginTransactionRequest request) {
460-
XGoogSpannerRequestId reqId =
461-
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
462460
try {
463461
Transaction transaction =
464-
rpc.beginTransaction(
465-
request, reqId.withOptions(getTransactionChannelHint()), isRouteToLeader());
462+
rpc.beginTransaction(request, getTransactionChannelHint(), isRouteToLeader());
466463
if (!transaction.hasReadTimestamp()) {
467464
throw SpannerExceptionFactory.newSpannerException(
468-
ErrorCode.INTERNAL,
469-
"Missing expected transaction.read_timestamp metadata field",
470-
reqId);
465+
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
471466
}
472467
if (transaction.getId().isEmpty()) {
473468
throw SpannerExceptionFactory.newSpannerException(
474-
ErrorCode.INTERNAL, "Missing expected transaction.id metadata field", reqId);
469+
ErrorCode.INTERNAL, "Missing expected transaction.id metadata field");
475470
}
476471
try {
477472
timestamp = Timestamp.fromProto(transaction.getReadTimestamp());
478473
} catch (IllegalArgumentException e) {
479474
throw SpannerExceptionFactory.newSpannerException(
480-
ErrorCode.INTERNAL,
481-
"Bad value in transaction.read_timestamp metadata field",
482-
e,
483-
reqId);
475+
ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e);
484476
}
485477
transactionId = transaction.getId();
486478
span.addAnnotation(
@@ -816,7 +808,8 @@ ResultSet executeQueryInternalWithOptions(
816808
@Override
817809
CloseableIterator<PartialResultSet> startStream(
818810
@Nullable ByteString resumeToken,
819-
AsyncResultSet.StreamMessageListener streamListener) {
811+
AsyncResultSet.StreamMessageListener streamListener,
812+
XGoogSpannerRequestId requestId) {
820813
GrpcStreamIterator stream =
821814
new GrpcStreamIterator(
822815
statement,
@@ -839,12 +832,12 @@ CloseableIterator<PartialResultSet> startStream(
839832
if (selector != null) {
840833
request.setTransaction(selector);
841834
}
842-
this.ensureNonNullXGoogRequestId();
843835
SpannerRpc.StreamingCall call =
844836
rpc.executeQuery(
845837
request.build(),
846838
stream.consumer(),
847-
this.xGoogRequestId.withOptions(getTransactionChannelHint()),
839+
getTransactionChannelHint(),
840+
requestId,
848841
isRouteToLeader());
849842
session.markUsed(clock.instant());
850843
stream.setCall(call, request.getTransaction().hasBegin());
@@ -860,7 +853,7 @@ boolean prepareIteratorForRetryOnDifferentGrpcChannel() {
860853
stream, this, options.hasDecodeMode() ? options.decodeMode() : defaultDecodeMode);
861854
}
862855

863-
Map<SpannerRpc.Option, ?> getChannelHintOptions(
856+
static Map<SpannerRpc.Option, ?> getChannelHintOptions(
864857
Map<SpannerRpc.Option, ?> channelHintForSession, Long channelHintForTransaction) {
865858
if (channelHintForSession != null) {
866859
return channelHintForSession;
@@ -1030,7 +1023,8 @@ ResultSet readInternalWithOptions(
10301023
@Override
10311024
CloseableIterator<PartialResultSet> startStream(
10321025
@Nullable ByteString resumeToken,
1033-
AsyncResultSet.StreamMessageListener streamListener) {
1026+
AsyncResultSet.StreamMessageListener streamListener,
1027+
XGoogSpannerRequestId requestId) {
10341028
GrpcStreamIterator stream =
10351029
new GrpcStreamIterator(
10361030
lastStatement, prefetchChunks, cancelQueryWhenClientIsClosed);
@@ -1048,13 +1042,12 @@ CloseableIterator<PartialResultSet> startStream(
10481042
builder.setTransaction(selector);
10491043
}
10501044
builder.setRequestOptions(buildRequestOptions(readOptions));
1051-
this.incrementXGoogRequestIdAttempt();
1052-
this.xGoogRequestId.setChannelId(session.getChannel());
10531045
SpannerRpc.StreamingCall call =
10541046
rpc.read(
10551047
builder.build(),
10561048
stream.consumer(),
1057-
this.xGoogRequestId.withOptions(getTransactionChannelHint()),
1049+
getTransactionChannelHint(),
1050+
requestId,
10581051
isRouteToLeader());
10591052
session.markUsed(clock.instant());
10601053
stream.setCall(call, /* withBeginTransaction= */ builder.getTransaction().hasBegin());

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AdminRequestsPerMinuteExceededException.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,15 @@ public class AdminRequestsPerMinuteExceededException extends SpannerException {
3232
/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
3333
AdminRequestsPerMinuteExceededException(
3434
DoNotConstructDirectly token, @Nullable String message, @Nullable Throwable cause) {
35-
this(token, message, cause, null, null);
35+
this(token, message, cause, null);
3636
}
3737

3838
/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
3939
AdminRequestsPerMinuteExceededException(
4040
DoNotConstructDirectly token,
4141
@Nullable String message,
4242
@Nullable Throwable cause,
43-
@Nullable ApiException apiException,
44-
@Nullable XGoogSpannerRequestId reqId) {
45-
super(token, ErrorCode.RESOURCE_EXHAUSTED, true, message, cause, apiException, reqId);
43+
@Nullable ApiException apiException) {
44+
super(token, ErrorCode.RESOURCE_EXHAUSTED, true, message, cause, apiException);
4645
}
4746
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -250,11 +250,9 @@ private List<Partition> partitionReadUsingIndex(
250250
}
251251
builder.setPartitionOptions(pbuilder.build());
252252

253-
XGoogSpannerRequestId reqId =
254-
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
255253
final PartitionReadRequest request = builder.build();
256254
try {
257-
PartitionResponse response = rpc.partitionRead(request, reqId.withOptions(options));
255+
PartitionResponse response = rpc.partitionRead(request, options);
258256
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
259257
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
260258
Partition partition =
@@ -274,7 +272,6 @@ private List<Partition> partitionReadUsingIndex(
274272
return partitionReadUsingIndex(
275273
partitionOptions, table, index, keys, columns, true, option);
276274
}
277-
e.setRequestId(reqId);
278275
throw e;
279276
}
280277
}
@@ -316,11 +313,9 @@ private List<Partition> partitionQuery(
316313
}
317314
builder.setPartitionOptions(pbuilder.build());
318315

319-
XGoogSpannerRequestId reqId =
320-
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
321316
final PartitionQueryRequest request = builder.build();
322317
try {
323-
PartitionResponse response = rpc.partitionQuery(request, reqId.withOptions(options));
318+
PartitionResponse response = rpc.partitionQuery(request, options);
324319
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
325320
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
326321
Partition partition =
@@ -333,7 +328,6 @@ private List<Partition> partitionQuery(
333328
if (!isFallback && maybeMarkUnimplementedForPartitionedOps(e)) {
334329
return partitionQuery(partitionOptions, statement, true, option);
335330
}
336-
e.setRequestId(reqId);
337331
throw e;
338332
}
339333
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInMetricsConstant.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package com.google.cloud.spanner;
1818

19-
import static com.google.cloud.spanner.XGoogSpannerRequestId.REQUEST_ID;
19+
import static com.google.cloud.spanner.XGoogSpannerRequestId.REQUEST_ID_HEADER_NAME;
2020

2121
import com.google.api.core.InternalApi;
2222
import com.google.api.gax.tracing.OpenTelemetryMetricsRecorder;
@@ -98,10 +98,12 @@ public class BuiltInMetricsConstant {
9898
AttributeKey.stringKey("directpath_enabled");
9999
public static final AttributeKey<String> DIRECT_PATH_USED_KEY =
100100
AttributeKey.stringKey("directpath_used");
101-
public static final AttributeKey<String> REQUEST_ID_KEY = AttributeKey.stringKey(REQUEST_ID);
101+
public static final AttributeKey<String> REQUEST_ID_KEY =
102+
AttributeKey.stringKey(REQUEST_ID_HEADER_NAME);
102103
public static final AttributeKey<String> GRPC_XDS_RESOURCE_TYPE_KEY =
103104
AttributeKey.stringKey("grpc.xds.resource_type");
104-
public static Set<String> ALLOWED_EXEMPLARS_ATTRIBUTES = new HashSet<>(Arrays.asList(REQUEST_ID));
105+
public static Set<String> ALLOWED_EXEMPLARS_ATTRIBUTES =
106+
new HashSet<>(Arrays.asList(REQUEST_ID_HEADER_NAME));
105107

106108
// IP address prefixes allocated for DirectPath backends.
107109
public static final String DP_IPV6_PREFIX = "2001:4860:8040";

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseNotFoundException.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class DatabaseNotFoundException extends ResourceNotFoundException {
3535
@Nullable String message,
3636
ResourceInfo resourceInfo,
3737
@Nullable Throwable cause) {
38-
this(token, message, resourceInfo, cause, null, null);
38+
this(token, message, resourceInfo, cause, null);
3939
}
4040

4141
/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
@@ -44,8 +44,7 @@ public class DatabaseNotFoundException extends ResourceNotFoundException {
4444
@Nullable String message,
4545
ResourceInfo resourceInfo,
4646
@Nullable Throwable cause,
47-
@Nullable ApiException apiException,
48-
@Nullable XGoogSpannerRequestId reqId) {
49-
super(token, message, resourceInfo, cause, apiException, reqId);
47+
@Nullable ApiException apiException) {
48+
super(token, message, resourceInfo, cause, apiException);
5049
}
5150
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/InstanceNotFoundException.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class InstanceNotFoundException extends ResourceNotFoundException {
3535
@Nullable String message,
3636
ResourceInfo resourceInfo,
3737
@Nullable Throwable cause) {
38-
this(token, message, resourceInfo, cause, null, null);
38+
this(token, message, resourceInfo, cause, null);
3939
}
4040

4141
/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
@@ -44,8 +44,7 @@ public class InstanceNotFoundException extends ResourceNotFoundException {
4444
@Nullable String message,
4545
ResourceInfo resourceInfo,
4646
@Nullable Throwable cause,
47-
@Nullable ApiException apiException,
48-
@Nullable XGoogSpannerRequestId reqId) {
49-
super(token, message, resourceInfo, cause, apiException, reqId);
47+
@Nullable ApiException apiException) {
48+
super(token, message, resourceInfo, cause, apiException);
5049
}
5150
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MissingDefaultSequenceKindException.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,8 @@ public class MissingDefaultSequenceKindException extends SpannerException {
3838
ErrorCode errorCode,
3939
String message,
4040
Throwable cause,
41-
@Nullable ApiException apiException,
42-
@Nullable XGoogSpannerRequestId reqId) {
43-
super(token, errorCode, /* retryable= */ false, message, cause, apiException, reqId);
41+
@Nullable ApiException apiException) {
42+
super(token, errorCode, /* retryable= */ false, message, cause, apiException);
4443
}
4544

4645
static boolean isMissingDefaultSequenceKindException(Throwable cause) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -745,7 +745,7 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti
745745

746746
@Override
747747
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
748-
return createMultiplexedSessionTransaction(/* singleUse= */ true)
748+
return createMultiplexedSessionTransaction(/* singleUse= */ false)
749749
.executePartitionedUpdate(stmt, options);
750750
}
751751

google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.spanner;
1818

19+
import static com.google.cloud.spanner.AbstractReadContext.getChannelHintOptions;
1920
import static com.google.common.base.Preconditions.checkState;
2021

2122
import com.google.api.core.InternalApi;
@@ -42,6 +43,7 @@
4243
import java.time.Duration;
4344
import java.time.temporal.ChronoUnit;
4445
import java.util.Map;
46+
import java.util.concurrent.ThreadLocalRandom;
4547
import java.util.concurrent.TimeUnit;
4648
import java.util.logging.Level;
4749
import java.util.logging.Logger;
@@ -56,12 +58,16 @@ public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction
5658
private final Ticker ticker;
5759
private final IsRetryableInternalError isRetryableInternalErrorPredicate;
5860
private volatile boolean isValid = true;
61+
private final Map<SpannerRpc.Option, ?> channelHintOptions;
5962

6063
PartitionedDmlTransaction(SessionImpl session, SpannerRpc rpc, Ticker ticker) {
6164
this.session = session;
6265
this.rpc = rpc;
6366
this.ticker = ticker;
6467
this.isRetryableInternalErrorPredicate = new IsRetryableInternalError();
68+
this.channelHintOptions =
69+
getChannelHintOptions(
70+
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
6571
}
6672

6773
/**
@@ -79,23 +85,21 @@ long executeStreamingPartitionedUpdate(
7985
boolean foundStats = false;
8086
long updateCount = 0L;
8187
Stopwatch stopwatch = Stopwatch.createStarted(ticker);
82-
XGoogSpannerRequestId reqId =
83-
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
84-
UpdateOption[] allOptions = new UpdateOption[updateOptions.length + 1];
85-
System.arraycopy(updateOptions, 0, allOptions, 0, updateOptions.length);
86-
allOptions[allOptions.length - 1] = new Options.RequestIdOption(reqId);
87-
Options options = Options.fromUpdateOptions(allOptions);
88+
Options options = Options.fromUpdateOptions(updateOptions);
8889

8990
try {
9091
ExecuteSqlRequest request = newTransactionRequestFrom(statement, options);
92+
// The channel ID is set to zero here. It will be filled in later by SpannerRpc when it reads
93+
// the channel hint from the options that are passed in.
94+
XGoogSpannerRequestId requestId = this.session.getRequestIdCreator().nextRequestId(0);
9195

9296
while (true) {
9397
final Duration remainingTimeout = tryUpdateTimeout(timeout, stopwatch);
9498

9599
try {
96100
ServerStream<PartialResultSet> stream =
97101
rpc.executeStreamingPartitionedDml(
98-
request, reqId.withOptions(session.getOptions()), remainingTimeout);
102+
request, channelHintOptions, requestId, remainingTimeout);
99103

100104
for (PartialResultSet rs : stream) {
101105
if (rs.getResumeToken() != null && !rs.getResumeToken().isEmpty()) {
@@ -110,40 +114,44 @@ long executeStreamingPartitionedUpdate(
110114
} catch (UnavailableException e) {
111115
LOGGER.log(
112116
Level.FINER, "Retrying PartitionedDml transaction after UnavailableException", e);
113-
reqId.incrementAttempt();
114117
request = resumeOrRestartRequest(resumeToken, statement, request, options);
118+
if (resumeToken.isEmpty()) {
119+
// Create a new xGoogSpannerRequestId if there is no resume token, as that means that
120+
// the entire transaction will be retried.
121+
requestId = session.getRequestIdCreator().nextRequestId(session.getChannel());
122+
}
115123
} catch (InternalException e) {
116124
if (!isRetryableInternalErrorPredicate.apply(e)) {
117125
throw e;
118126
}
119127

120128
LOGGER.log(
121129
Level.FINER, "Retrying PartitionedDml transaction after InternalException - EOS", e);
122-
reqId.incrementAttempt();
123130
request = resumeOrRestartRequest(resumeToken, statement, request, options);
131+
if (resumeToken.isEmpty()) {
132+
// Create a new xGoogSpannerRequestId if there is no resume token, as that means that
133+
// the entire transaction will be retried.
134+
requestId = session.getRequestIdCreator().nextRequestId(session.getChannel());
135+
}
124136
} catch (AbortedException e) {
125137
LOGGER.log(Level.FINER, "Retrying PartitionedDml transaction after AbortedException", e);
126138
resumeToken = ByteString.EMPTY;
127139
foundStats = false;
128140
updateCount = 0L;
129141
request = newTransactionRequestFrom(statement, options);
130142
// Create a new xGoogSpannerRequestId.
131-
reqId = session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
132-
} catch (SpannerException e) {
133-
e.setRequestId(reqId);
134-
throw e;
143+
requestId = session.getRequestIdCreator().nextRequestId(session.getChannel());
135144
}
136145
}
137146
if (!foundStats) {
138147
throw SpannerExceptionFactory.newSpannerException(
139148
ErrorCode.INVALID_ARGUMENT,
140-
"Partitioned DML response missing stats possibly due to non-DML statement as input",
141-
reqId);
149+
"Partitioned DML response missing stats possibly due to non-DML statement as input");
142150
}
143151
LOGGER.log(Level.FINER, "Finished PartitionedUpdate statement");
144152
return updateCount;
145153
} catch (Exception e) {
146-
throw SpannerExceptionFactory.newSpannerException(e, reqId);
154+
throw SpannerExceptionFactory.asSpannerException(e);
147155
}
148156
}
149157

@@ -223,14 +231,11 @@ private ByteString initTransaction(final Options options) {
223231
.setExcludeTxnFromChangeStreams(
224232
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
225233
.build();
226-
XGoogSpannerRequestId reqId =
227-
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
228-
Transaction tx = rpc.beginTransaction(request, reqId.withOptions(session.getOptions()), true);
234+
Transaction tx = rpc.beginTransaction(request, channelHintOptions, true);
229235
if (tx.getId().isEmpty()) {
230236
throw SpannerExceptionFactory.newSpannerException(
231237
ErrorCode.INTERNAL,
232-
"Failed to init transaction, missing transaction id\n" + session.getName(),
233-
reqId);
238+
"Failed to init transaction, missing transaction id\n" + session.getName());
234239
}
235240
return tx.getId();
236241
}

0 commit comments

Comments
 (0)