Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 73 additions & 2 deletions src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,79 @@ public async Task ValidateDeliveryLimit_Should_Set_Delivery_Limit_Policy()
for (int i = 0; i < attempts; i++)
{
var queue = await managementClient.GetQueue(queueName);
var deliveryLimit = queue.GetDeliveryLimit();

if (queue.DeliveryLimit == -1 && queue.AppliedPolicyName == policyName)
if (deliveryLimit == Queue.BigValueInsteadOfActuallyUnlimited && queue.AppliedPolicyName == policyName)
{
// Policy applied successfully
return;
}

await Task.Delay(TimeSpan.FromSeconds(3));
}

Assert.Fail($"Policy '{policyName}' was not applied to queue '{queueName}'.");
}

[Test]
public async Task ValidateDeliveryLimit_Should_Update_Old_Unlimited_Policy_Created_By_Transport()
{
var managementClient = new ManagementClient(connectionConfiguration);
using var brokerVerifier = new BrokerVerifier(managementClient, BrokerRequirementChecks.None, true);
await brokerVerifier.Initialize();

if (brokerVerifier.BrokerVersion < BrokerVerifier.BrokerVersion4)
{
Assert.Ignore("Test not valid for broker versions before 4.0.0");
}

var queueName = nameof(ValidateDeliveryLimit_Should_Set_Delivery_Limit_Policy);
var policyName = $"nsb.{queueName}.delivery-limit";

var oldUnlimitedPolicy = new Policy
{
ApplyTo = PolicyTarget.QuorumQueues,
Definition = new PolicyDefinition { DeliveryLimit = -1 },
Pattern = queueName,
Priority = 0
};

await CreateQueue(queueName);
await managementClient.CreatePolicy(policyName, oldUnlimitedPolicy);

// It can take some time for updated policies to be applied, so we need to wait.
// If this test is randomly failing, consider increasing the attempts
var attempts = 20;

int deliveryLimit = 0;

for (int i = 0; i < attempts; i++)
{
var queue = await managementClient.GetQueue(queueName);
deliveryLimit = queue.GetDeliveryLimit();

if (deliveryLimit == -1 && queue.AppliedPolicyName == policyName)
{
// Policy applied successfully
break;
}

await Task.Delay(TimeSpan.FromSeconds(3));
}

if (deliveryLimit != -1)
{
Assert.Fail($"Old unlimited Policy '{policyName}' was not applied to queue '{queueName}'.");
}

await brokerVerifier.ValidateDeliveryLimit(queueName);

for (int i = 0; i < attempts; i++)
{
var queue = await managementClient.GetQueue(queueName);
deliveryLimit = queue.GetDeliveryLimit();

if (deliveryLimit == Queue.BigValueInsteadOfActuallyUnlimited && queue.AppliedPolicyName == policyName)
{
// Policy applied successfully
return;
Expand Down Expand Up @@ -124,7 +195,7 @@ public async Task ValidateDeliveryLimit_Should_Throw_When_A_Policy_On_Queue_Has_

await managementClient.CreatePolicy(policyName, policy).ConfigureAwait(false);

// If this test appears flaky, the delay should be increased to give the broker more time to apply the policy before calling ValidateDeliveryLimit
// If this test appears flaky, the delay should be increased to give the broker more time to apply the oldUnlimitedPolicy before calling ValidateDeliveryLimit
await Task.Delay(TimeSpan.FromSeconds(30));

var exception = Assert.ThrowsAsync<InvalidOperationException>(async () => await brokerVerifier.ValidateDeliveryLimit(queueName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,16 @@ bool ShouldOverrideDeliveryLimit(Queue queue)

var limit = queue.GetDeliveryLimit();

if (limit == -1)
if (limit is Queue.BigValueInsteadOfActuallyUnlimited)
{
return false;
}

if (limit == -1 && queue.AppliedPolicyName == $"nsb.{queue.Name}.delivery-limit")
{
return true;
}

if (queue.Arguments.DeliveryLimit.HasValue || (queue.EffectivePolicyDefinition?.DeliveryLimit.HasValue ?? false))
{
throw new InvalidOperationException($"The delivery limit for '{queue.Name}' is set to the non-default value of '{limit}'. Remove any delivery limit settings from queue arguments, user policies or operator policies to correct this.");
Expand All @@ -191,17 +196,17 @@ async Task SetDeliveryLimitViaPolicy(Queue queue, CancellationToken cancellation
throw new InvalidOperationException($"Cannot create unlimited delivery limit policies in RabbitMQ versions prior to 4.0. The version is: {brokerVersion}.");
}

if (!string.IsNullOrEmpty(queue.AppliedPolicyName))
var policyName = $"nsb.{queue.Name}.delivery-limit";

if (!string.IsNullOrEmpty(queue.AppliedPolicyName) && queue.AppliedPolicyName != policyName)
{
throw new InvalidOperationException($"An unlimited delivery limit policy cannot be applied to the '{queue.Name}' queue because it already has a '{queue.AppliedPolicyName}' policy applied.");
}

var policyName = $"nsb.{queue.Name}.delivery-limit";

var policy = new Policy
{
ApplyTo = PolicyTarget.QuorumQueues,
Definition = new PolicyDefinition { DeliveryLimit = -1 },
Definition = new PolicyDefinition { DeliveryLimit = Queue.BigValueInsteadOfActuallyUnlimited },
Pattern = queue.Name,
Priority = 0
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public int GetDeliveryLimit()
}

// RabbitMQ 3.x
// The broker doesn't tell us what the actual delivery limit is, so we have to figure it out
// We have to find the lowest value from the possible places in can be configured
// The broker doesn't tell us what the actual delivery limit is, so we have to figure it out.
// We have to find the lowest value from the possible places it can be configured.

int? limit = null;

Expand All @@ -67,6 +67,8 @@ public int GetDeliveryLimit()
limit = 0;
}

return limit ?? -1;
return limit ?? BigValueInsteadOfActuallyUnlimited;
}

public const int BigValueInsteadOfActuallyUnlimited = 100_000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we are trying to be as limited as possible since this is a "bug" fix, however, should we consider letting the user configure this? I can see an argument for not doing so, in that NSB should never reach this number of retries. This number being a "problem" presupposes a bug in RabbitMQ transport or NServiceBus core, or misconfigured recoverability settings.

}