Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using static Google.Cloud.PubSub.V1.SubscriberClient;

// Tests create quite a few tasks that don't need awaiting.
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Expand Down Expand Up @@ -159,7 +160,12 @@ private async Task RunBulkMessagingImpl(
{
// Test finished, so stop subscriber
Console.WriteLine("All msgs received, stopping subscriber.");
Task unused = subscriber.StopAsync(TimeSpan.FromSeconds(15));
var shutdownOptions = new ShutdownOptions
{
Mode = ShutdownMode.NackImmediately,
Timeout = TimeSpan.FromSeconds(15)
};
Task unused = subscriber.StopAsync(shutdownOptions);
}
}
else
Expand Down Expand Up @@ -194,11 +200,15 @@ private async Task RunBulkMessagingImpl(
{
if (noProgressCount > 60)
{
// Deadlock, shutdown subscriber, and cancel
Console.WriteLine("Deadlock detected. Cancelling test");
subscriber.StopAsync(new CancellationToken(true));
watchdogCts.Cancel();
break;
// Deadlock, shutdown subscriber, and cancel
Console.WriteLine("Deadlock detected. Cancelling test");
var shutdownOptions = new ShutdownOptions
{
Mode = ShutdownMode.NackImmediately
};
subscriber.StopAsync(shutdownOptions, cancellationToken: new CancellationToken(true));
watchdogCts.Cancel();
break;
}
noProgressCount += 1;
}
Expand Down Expand Up @@ -431,7 +441,12 @@ public async Task StopStartSubscriber(int totalMessageCount, double publisherFre
});
await Task.Delay(subscriberLifetime);
Console.WriteLine("Stopping subscriber");
Task stopTask = subscriber.StopAsync(TimeSpan.FromSeconds(15));
var shutdownOptions = new ShutdownOptions
{
Mode = ShutdownMode.NackImmediately,
Timeout = TimeSpan.FromSeconds(15)
};
Task stopTask = subscriber.StopAsync(shutdownOptions);
// If shutdown times-out then stopTask, and also Task.WhenAll will cancel, causing the test to fail.
await Task.WhenAll(subscribeTask, stopTask);
int recvCount = recvedMsgs.Locked(() => recvedMsgs.Count);
Expand Down Expand Up @@ -538,8 +553,13 @@ await subscriberApi.IAMPolicyClient.SetIamPolicyAsync(new SetIamPolicyRequest
{
result.Add((msg.GetDeliveryAttempt(), true));
// Received DLQ message, so stop test.
sub.StopAsync(TimeSpan.FromSeconds(10));
dlqSub.StopAsync(TimeSpan.FromSeconds(10));
var shutdownOptions = new ShutdownOptions
{
Mode = ShutdownMode.NackImmediately,
Timeout = TimeSpan.FromSeconds(10)
};
sub.StopAsync(shutdownOptions);
dlqSub.StopAsync(shutdownOptions);
return Task.FromResult(SubscriberClient.Reply.Ack);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Task Subscribe()
if (recvCount == inputLines.Count)
{
Console.WriteLine("Received all messages, shutting down");
var dummyTask = sub.StopAsync(CancellationToken.None);
var dummyTask = sub.StopAsync(new SubscriberClient.ShutdownOptions {Mode = SubscriberClient.ShutdownMode.NackImmediately});
}
}
if (rnd.Next(3) == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,11 @@ await _subscriberClient.StartAsync((msg, token) =>
return Task.FromResult(SubscriberClient.Reply.Ack);
});

public override async Task StopAsync(CancellationToken stoppingToken) =>
await _subscriberClient.StopAsync(stoppingToken);
public override async Task StopAsync(CancellationToken stoppingToken)
{
var shutdownOptions = new SubscriberClient.ShutdownOptions {Mode = SubscriberClient.ShutdownMode.NackImmediately};
await _subscriberClient.StopAsync(shutdownOptions, cancellationToken: stoppingToken);
}
}
// End sample
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ await subscriber.StartAsync((msg, cancellationToken) =>
Console.WriteLine($"Text: '{msg.Data.ToStringUtf8()}'");
// Stop this subscriber after one message is received.
// This is non-blocking, and the returned Task may be awaited.
subscriber.StopAsync(TimeSpan.FromSeconds(15));
subscriber.StopAsync(new SubscriberClient.ShutdownOptions
{
Mode = SubscriberClient.ShutdownMode.NackImmediately,
Timeout =TimeSpan.FromSeconds(15)
});
// Return Reply.Ack to indicate this message has been handled.
return Task.FromResult(SubscriberClient.Reply.Ack);
});
Expand Down
Loading
Loading