Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,10 @@ public class ObservabilityAttributes {

/** The url template of the request (e.g. /v1/{name}:access). */
public static final String URL_TEMPLATE_ATTRIBUTE = "url.template";

/** The resend count of the request. Only used in HTTP transport. */
public static final String HTTP_RESEND_COUNT_ATTRIBUTE = "http.request.resend_count";

/** The resend count of the request. Only used in gRPC transport. */
public static final String GRPC_RESEND_COUNT_ATTRIBUTE = "gcp.grpc.resend_count";
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ static Attributes toOtelAttributes(Map<String, Object> attributes) {
(k, v) -> {
if (v instanceof String) {
attributesBuilder.put(k, (String) v);
} else if (v instanceof Long) {
attributesBuilder.put(k, (Long) v);
} else if (v instanceof Integer) {
attributesBuilder.put(k, (long) (Integer) v);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class SpanTracer implements ApiTracer {
private final String attemptSpanName;
private final ApiTracerContext apiTracerContext;
private TraceManager.Span attemptHandle;
private long resendCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use AtomicInteger instead? There should be only one thread modifying the value, but in case the retries happen in a different thread, the number may not be immediately visible to other thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do


/**
* Creates a new instance of {@code SpanTracer}.
Expand All @@ -65,6 +66,7 @@ public SpanTracer(
this.attemptSpanName = attemptSpanName;
this.apiTracerContext = apiTracerContext;
this.attemptAttributes = new HashMap<>();
this.resendCount = 0;
buildAttributes();
}

Expand All @@ -76,15 +78,48 @@ private void buildAttributes() {
@Override
public void attemptStarted(Object request, int attemptNumber) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we verify that if we can reuse attemptNumber?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I'll do a quick research and provide info in the PR description.

Map<String, Object> attemptAttributes = new HashMap<>(this.attemptAttributes);

if (this.resendCount > 0) {
ApiTracerContext.Transport transport = apiTracerContext.transport();
if (transport == ApiTracerContext.Transport.GRPC) {
attemptAttributes.put(
ObservabilityAttributes.GRPC_RESEND_COUNT_ATTRIBUTE, this.resendCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This attribute should starting at 1 for the first retry per requirements doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are handling that requirement in line 82:

if (this.resendCount > 0) {...}

} else if (transport == ApiTracerContext.Transport.HTTP) {
attemptAttributes.put(
ObservabilityAttributes.HTTP_RESEND_COUNT_ATTRIBUTE, this.resendCount);
}
}

// Start the specific attempt span with the operation span as parent
this.attemptHandle = traceManager.createSpan(attemptSpanName, attemptAttributes);
this.resendCount++;
}

@Override
public void attemptSucceeded() {
endAttempt();
}

@Override
public void attemptCancelled() {
endAttempt();
}

@Override
public void attemptFailedDuration(Throwable error, java.time.Duration delay) {
endAttempt();
}

@Override
public void attemptFailedRetriesExhausted(Throwable error) {
endAttempt();
}

@Override
public void attemptPermanentFailure(Throwable error) {
endAttempt();
}

private void endAttempt() {
if (attemptHandle != null) {
attemptHandle.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,90 @@ void testAttemptStarted_includesLanguageAttribute() {
assertThat(attributesCaptor.getValue())
.containsEntry(SpanTracer.LANGUAGE_ATTRIBUTE, SpanTracer.DEFAULT_LANGUAGE);
}

@Test
void testAttemptStarted_retryAttributes_grpc() {
ApiTracerContext grpcContext =
ApiTracerContext.newBuilder()
.setLibraryMetadata(com.google.api.gax.rpc.LibraryMetadata.empty())
.setTransport(ApiTracerContext.Transport.GRPC)
.build();
SpanTracer grpcTracer = new SpanTracer(recorder, grpcContext, ATTEMPT_SPAN_NAME);

// First attempt, no retry attribute
grpcTracer.attemptStarted(new Object(), 0);
ArgumentCaptor<Map> attributesCaptor = ArgumentCaptor.forClass(Map.class);
verify(recorder).createSpan(eq(ATTEMPT_SPAN_NAME), attributesCaptor.capture());
assertThat(attributesCaptor.getValue())
.doesNotContainKey(ObservabilityAttributes.GRPC_RESEND_COUNT_ATTRIBUTE);
assertThat(attributesCaptor.getValue())
.doesNotContainKey(ObservabilityAttributes.HTTP_RESEND_COUNT_ATTRIBUTE);

// First retry
grpcTracer.attemptStarted(new Object(), 0);
verify(recorder, org.mockito.Mockito.times(2))
.createSpan(eq(ATTEMPT_SPAN_NAME), attributesCaptor.capture());
Map<String, Object> capturedAttributes = (Map<String, Object>) attributesCaptor.getValue();
assertThat(capturedAttributes)
.containsEntry(ObservabilityAttributes.GRPC_RESEND_COUNT_ATTRIBUTE, 1L);
assertThat(capturedAttributes)
.doesNotContainKey(ObservabilityAttributes.HTTP_RESEND_COUNT_ATTRIBUTE);

// N-th retry
grpcTracer.attemptStarted(new Object(), 0);
grpcTracer.attemptStarted(new Object(), 0);
grpcTracer.attemptStarted(new Object(), 0);
grpcTracer.attemptStarted(new Object(), 0);
verify(recorder, org.mockito.Mockito.times(6))
.createSpan(eq(ATTEMPT_SPAN_NAME), attributesCaptor.capture());
capturedAttributes = (Map<String, Object>) attributesCaptor.getValue();
assertThat(capturedAttributes)
.containsEntry(ObservabilityAttributes.GRPC_RESEND_COUNT_ATTRIBUTE, 5L);
assertThat(capturedAttributes)
.doesNotContainKey(ObservabilityAttributes.HTTP_RESEND_COUNT_ATTRIBUTE);
}

@Test
void testAttemptStarted_retryAttributes_http() {
ApiTracerContext httpContext =
ApiTracerContext.newBuilder()
.setLibraryMetadata(com.google.api.gax.rpc.LibraryMetadata.empty())
.setTransport(ApiTracerContext.Transport.HTTP)
.build();
SpanTracer httpTracer = new SpanTracer(recorder, httpContext, ATTEMPT_SPAN_NAME);
ArgumentCaptor<Map> attributesCaptor = ArgumentCaptor.forClass(Map.class);

// First attempt, no retry attribute
httpTracer.attemptStarted(new Object(), 0);
verify(recorder, org.mockito.Mockito.times(1))
.createSpan(eq(ATTEMPT_SPAN_NAME), attributesCaptor.capture());
Map<String, Object> capturedAttributes = (Map<String, Object>) attributesCaptor.getValue();
assertThat(capturedAttributes)
.doesNotContainKey(ObservabilityAttributes.GRPC_RESEND_COUNT_ATTRIBUTE);
assertThat(capturedAttributes)
.doesNotContainKey(ObservabilityAttributes.HTTP_RESEND_COUNT_ATTRIBUTE);

// First retry
httpTracer.attemptStarted(new Object(), 0);
verify(recorder, org.mockito.Mockito.times(2))
.createSpan(eq(ATTEMPT_SPAN_NAME), attributesCaptor.capture());
capturedAttributes = (Map<String, Object>) attributesCaptor.getValue();
assertThat(capturedAttributes)
.doesNotContainKey(ObservabilityAttributes.GRPC_RESEND_COUNT_ATTRIBUTE);
assertThat(capturedAttributes)
.containsEntry(ObservabilityAttributes.HTTP_RESEND_COUNT_ATTRIBUTE, 1L);

// N-th retry
httpTracer.attemptStarted(new Object(), 0);
httpTracer.attemptStarted(new Object(), 0);
httpTracer.attemptStarted(new Object(), 0);
httpTracer.attemptStarted(new Object(), 0);
verify(recorder, org.mockito.Mockito.times(6))
.createSpan(eq(ATTEMPT_SPAN_NAME), attributesCaptor.capture());
capturedAttributes = (Map<String, Object>) attributesCaptor.getValue();
assertThat(capturedAttributes)
.doesNotContainKey(ObservabilityAttributes.GRPC_RESEND_COUNT_ATTRIBUTE);
assertThat(capturedAttributes)
.containsEntry(ObservabilityAttributes.HTTP_RESEND_COUNT_ATTRIBUTE, 5L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,24 @@
package com.google.showcase.v1beta1.it;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;

import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.UnavailableException;
import com.google.api.gax.tracing.ObservabilityAttributes;
import com.google.api.gax.tracing.OpenTelemetryTraceManager;
import com.google.api.gax.tracing.SpanTracer;
import com.google.api.gax.tracing.SpanTracerFactory;
import com.google.rpc.Status;
import com.google.showcase.v1beta1.EchoClient;
import com.google.showcase.v1beta1.EchoRequest;
import com.google.showcase.v1beta1.EchoSettings;
import com.google.showcase.v1beta1.it.util.TestClientInitializer;
import com.google.showcase.v1beta1.stub.EchoStub;
import com.google.showcase.v1beta1.stub.EchoStubSettings;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
Expand Down Expand Up @@ -196,4 +206,148 @@ void testTracing_successfulEcho_httpjson() throws Exception {
.isEqualTo("v1beta1/echo:echo");
}
}

@Test
void testTracing_retry_grpc() throws Exception {
final int attempts = 5;
final StatusCode.Code statusCode = StatusCode.Code.UNAVAILABLE;
// A custom EchoClient is used in this test because retries have jitter, and we cannot
// predict the number of attempts that are scheduled for an RPC invocation otherwise.
// The custom retrySettings limit to a set number of attempts before the call gives up.
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setTotalTimeout(org.threeten.bp.Duration.ofMillis(5000L))
.setMaxAttempts(attempts)
.build();

EchoStubSettings.Builder grpcEchoSettingsBuilder = EchoStubSettings.newBuilder();
grpcEchoSettingsBuilder
.echoSettings()
.setRetrySettings(retrySettings)
.setRetryableCodes(statusCode);
EchoSettings grpcEchoSettings = EchoSettings.create(grpcEchoSettingsBuilder.build());
grpcEchoSettings =
grpcEchoSettings.toBuilder()
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(EchoSettings.defaultGrpcTransportProviderBuilder().build())
.setEndpoint("localhost:7469")
.build();

SpanTracerFactory tracingFactory =
new SpanTracerFactory(new OpenTelemetryTraceManager(openTelemetrySdk));

EchoStubSettings echoStubSettings =
(EchoStubSettings)
grpcEchoSettings.getStubSettings().toBuilder().setTracerFactory(tracingFactory).build();
EchoStub stub = echoStubSettings.createStub();
EchoClient grpcClient = EchoClient.create(stub);

EchoRequest echoRequest =
EchoRequest.newBuilder()
.setError(Status.newBuilder().setCode(statusCode.ordinal()).build())
.build();

assertThrows(UnavailableException.class, () -> grpcClient.echo(echoRequest));

List<SpanData> spans = spanExporter.getFinishedSpanItems();
assertThat(spans).hasSize(attempts); // Expect exactly one span for the successful retry

// This single span represents the successful retry, which has resend_count=1
// The first attempt has no resend_count. The subsequent retries will have a resend_count,
// starting from 1.
List<Long> resendCounts =
spans.stream()
.map(
span ->
(Long)
span.getAttributes()
.asMap()
.get(
AttributeKey.longKey(
ObservabilityAttributes.GRPC_RESEND_COUNT_ATTRIBUTE)))
.filter(java.util.Objects::nonNull)
.sorted()
.collect(java.util.stream.Collectors.toList());

List<Long> expectedCounts =
java.util.stream.LongStream.range(1, attempts)
.boxed()
.collect(java.util.stream.Collectors.toList());
assertThat(resendCounts).containsExactlyElementsIn(expectedCounts).inOrder();
}

@Test
void testTracing_retry_httpjson() throws Exception {
final int attempts = 5;
final StatusCode.Code statusCode = StatusCode.Code.UNAVAILABLE;
// A custom EchoClient is used in this test because retries have jitter, and we cannot
// predict the number of attempts that are scheduled for an RPC invocation otherwise.
// The custom retrySettings limit to a set number of attempts before the call gives up.
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setTotalTimeout(org.threeten.bp.Duration.ofMillis(5000L))
.setMaxAttempts(attempts)
.build();

EchoStubSettings.Builder httpJsonEchoSettingsBuilder = EchoStubSettings.newHttpJsonBuilder();
httpJsonEchoSettingsBuilder
.echoSettings()
.setRetrySettings(retrySettings)
.setRetryableCodes(statusCode);
EchoSettings httpJsonEchoSettings = EchoSettings.create(httpJsonEchoSettingsBuilder.build());
httpJsonEchoSettings =
httpJsonEchoSettings.toBuilder()
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(
EchoSettings.defaultHttpJsonTransportProviderBuilder()
.setHttpTransport(
new NetHttpTransport.Builder().doNotValidateCertificate().build())
.setEndpoint("http://localhost:7469")
.build())
.build();

SpanTracerFactory tracingFactory =
new SpanTracerFactory(new OpenTelemetryTraceManager(openTelemetrySdk));

EchoStubSettings echoStubSettings =
(EchoStubSettings)
httpJsonEchoSettings.getStubSettings().toBuilder()
.setTracerFactory(tracingFactory)
.build();
EchoStub stub = echoStubSettings.createStub();
EchoClient httpClient = EchoClient.create(stub);

EchoRequest echoRequest =
EchoRequest.newBuilder()
.setError(Status.newBuilder().setCode(statusCode.ordinal()).build())
.build();

assertThrows(UnavailableException.class, () -> httpClient.echo(echoRequest));

List<SpanData> spans = spanExporter.getFinishedSpanItems();
assertThat(spans).hasSize(attempts); // Expect exactly one span for the successful retry

// This single span represents the successful retry, which has resend_count=1
// The first attempt has no resend_count. The subsequent retries will have a resend_count,
// starting from 1.
List<Long> resendCounts =
spans.stream()
.map(
span ->
(Long)
span.getAttributes()
.asMap()
.get(
AttributeKey.longKey(
ObservabilityAttributes.HTTP_RESEND_COUNT_ATTRIBUTE)))
.filter(java.util.Objects::nonNull)
.sorted()
.collect(java.util.stream.Collectors.toList());

List<Long> expectedCounts =
java.util.stream.LongStream.range(1, attempts)
.boxed()
.collect(java.util.stream.Collectors.toList());
assertThat(resendCounts).containsExactlyElementsIn(expectedCounts).inOrder();
}
}
Loading