Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ASB Queue functionality to Brighter v9 #3174

Merged
merged 1 commit into from
Jun 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,42 @@ public class AzureServiceBusConsumer : IAmAMessageConsumer
private readonly int _batchSize;
private IServiceBusReceiverWrapper _serviceBusReceiver;
private readonly string _subscriptionName;
private readonly bool _useQueues;
private bool _subscriptionCreated;
private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<AzureServiceBusConsumer>();
private readonly OnMissingChannel _makeChannel;
private readonly AzureServiceBusSubscriptionConfiguration _subscriptionConfiguration;
private readonly ServiceBusReceiveMode _receiveMode;

/// <summary>
/// Initializes an Instance of <see cref="AzureServiceBusConsumer"/> that uses Queues
/// </summary>
/// <param name="queueName">The name of the Topic.</param>
/// <param name="messageProducerSync">An instance of the Messaging Producer used for Requeue.</param>
/// <param name="administrationClientWrapper">An Instance of Administration Client Wrapper.</param>
/// <param name="serviceBusReceiverProvider">An Instance of <see cref="ServiceBusReceiverProvider"/>.</param>
/// <param name="batchSize">How many messages to receive at a time.</param>
/// <param name="receiveMode">The mode in which to Receive.</param>
/// <param name="makeChannels">The mode in which to make Channels.</param>
/// <param name="subscriptionConfiguration">The configuration options for the subscriptions.</param>
public AzureServiceBusConsumer(string queueName, IAmAMessageProducerSync messageProducerSync,
IAdministrationClientWrapper administrationClientWrapper,
IServiceBusReceiverProvider serviceBusReceiverProvider, int batchSize = 10,
ServiceBusReceiveMode receiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
OnMissingChannel makeChannels = OnMissingChannel.Create,
AzureServiceBusSubscriptionConfiguration subscriptionConfiguration = default): this(messageProducerSync,
administrationClientWrapper, serviceBusReceiverProvider, batchSize, receiveMode, makeChannels,
subscriptionConfiguration)
{
_useQueues = true;
_topicName = queueName;

if (!_subscriptionConfiguration.RequireSession)
GetMessageReceiverProvider();
}

/// <summary>
/// Initializes an Instance of <see cref="AzureServiceBusConsumer"/>
/// Initializes an Instance of <see cref="AzureServiceBusConsumer"/> that uses Topics
/// </summary>
/// <param name="topicName">The name of the Topic.</param>
/// <param name="subscriptionName">The name of the Subscription on the Topic.</param>
Expand All @@ -43,20 +71,32 @@ public AzureServiceBusConsumer(string topicName, string subscriptionName,
IServiceBusReceiverProvider serviceBusReceiverProvider, int batchSize = 10,
ServiceBusReceiveMode receiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
OnMissingChannel makeChannels = OnMissingChannel.Create,
AzureServiceBusSubscriptionConfiguration subscriptionConfiguration = default)
AzureServiceBusSubscriptionConfiguration subscriptionConfiguration = default) : this(messageProducerSync,
administrationClientWrapper, serviceBusReceiverProvider, batchSize, receiveMode, makeChannels,
subscriptionConfiguration)
{
_subscriptionName = subscriptionName;
_topicName = topicName;
_useQueues = false;

if (!_subscriptionConfiguration.RequireSession)
GetMessageReceiverProvider();
}

private AzureServiceBusConsumer(IAmAMessageProducerSync messageProducerSync,
IAdministrationClientWrapper administrationClientWrapper,
IServiceBusReceiverProvider serviceBusReceiverProvider, int batchSize = 10,
ServiceBusReceiveMode receiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
OnMissingChannel makeChannels = OnMissingChannel.Create,
AzureServiceBusSubscriptionConfiguration subscriptionConfiguration = default)
{
_messageProducerSync = messageProducerSync;
_administrationClientWrapper = administrationClientWrapper;
_serviceBusReceiverProvider = serviceBusReceiverProvider;
_batchSize = batchSize;
_makeChannel = makeChannels;
_subscriptionConfiguration = subscriptionConfiguration ?? new AzureServiceBusSubscriptionConfiguration();
_receiveMode = receiveMode;

if(!_subscriptionConfiguration.RequireSession)
GetMessageReceiverProvider();
}

/// <summary>
Expand Down Expand Up @@ -235,7 +275,7 @@ public void Purge()
s_logger.LogInformation("Purging messages from {Subscription} Subscription on Topic {Topic}",
_subscriptionName, _topicName);

_administrationClientWrapper.DeleteTopicAsync(_topicName);
_administrationClientWrapper.DeleteChannelAsync(_topicName, _subscriptionConfiguration.UseServiceBusQueue);
EnsureSubscription();
}

Expand All @@ -256,7 +296,11 @@ private void GetMessageReceiverProvider()
_topicName, _subscriptionName, _receiveMode);
try
{
_serviceBusReceiver = _serviceBusReceiverProvider.Get(_topicName, _subscriptionName, _receiveMode, _subscriptionConfiguration.RequireSession);
_serviceBusReceiver = _useQueues
? _serviceBusReceiverProvider.Get(_topicName, _receiveMode,
_subscriptionConfiguration.RequireSession)
: _serviceBusReceiverProvider.Get(_topicName, _subscriptionName, _receiveMode,
_subscriptionConfiguration.RequireSession);
}
catch (Exception e)
{
Expand Down Expand Up @@ -334,7 +378,7 @@ private static int GetHandledCount(IBrokeredMessageWrapper azureServiceBusMessag

private void EnsureSubscription()
{
if (_subscriptionCreated || _makeChannel.Equals(OnMissingChannel.Assume))
if (_subscriptionCreated || _makeChannel.Equals(OnMissingChannel.Assume) || _useQueues)
return;

try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,42 @@ public IAmAMessageConsumer Create(Subscription subscription)
var nameSpaceManagerWrapper = new AdministrationClientWrapper(_clientProvider);

var config = new AzureServiceBusSubscriptionConfiguration();
if (subscription is AzureServiceBusSubscription sub)

if (subscription is AzureServiceBusSubscription sub)
config = sub.Configuration;

return new AzureServiceBusConsumer(
subscription.RoutingKey,
subscription.ChannelName,
new AzureServiceBusMessageProducer(
return config.UseServiceBusQueue
? new AzureServiceBusConsumer(
subscription.RoutingKey,
new AzureServiceBusMessageProducer(
nameSpaceManagerWrapper,
new ServiceBusSenderProvider(_clientProvider),
new AzureServiceBusPublication
{
MakeChannels = subscription.MakeChannels, UseServiceBusQueue = config.UseServiceBusQueue
}),
nameSpaceManagerWrapper,
new ServiceBusReceiverProvider(_clientProvider),
makeChannels: subscription.MakeChannels,
receiveMode: _ackOnRead ? ServiceBusReceiveMode.ReceiveAndDelete : ServiceBusReceiveMode.PeekLock,
batchSize: subscription.BufferSize,
subscriptionConfiguration: config)
: new AzureServiceBusConsumer(
subscription.RoutingKey,
subscription.ChannelName,
new AzureServiceBusMessageProducer(
nameSpaceManagerWrapper,
new ServiceBusSenderProvider(_clientProvider),
new AzureServiceBusPublication()
{
MakeChannels = subscription.MakeChannels, UseServiceBusQueue = config.UseServiceBusQueue
}),
nameSpaceManagerWrapper,
new ServiceBusSenderProvider(_clientProvider),
subscription.MakeChannels),
nameSpaceManagerWrapper,
new ServiceBusReceiverProvider(_clientProvider),
makeChannels: subscription.MakeChannels,
receiveMode: _ackOnRead ? ServiceBusReceiveMode.ReceiveAndDelete : ServiceBusReceiveMode.PeekLock,
batchSize: subscription.BufferSize,
subscriptionConfiguration: config);
new ServiceBusReceiverProvider(_clientProvider),
makeChannels: subscription.MakeChannels,
receiveMode: _ackOnRead ? ServiceBusReceiveMode.ReceiveAndDelete : ServiceBusReceiveMode.PeekLock,
batchSize: subscription.BufferSize,
subscriptionConfiguration: config);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,38 @@ public class AzureServiceBusMessageProducer : IAmAMessageProducerSync, IAmAMessa
private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<AzureServiceBusMessageProducer>();
private const int TopicConnectionSleepBetweenRetriesInMilliseconds = 100;
private const int TopicConnectionRetryCount = 5;
private readonly OnMissingChannel _makeChannel;
private readonly int _bulkSendBatchSize;

private readonly AzureServiceBusPublication _publication;

/// <summary>
/// An Azure Service Bus Message producer <see cref="IAmAMessageProducer"/>
/// </summary>
/// <param name="administrationClientWrapper">The administrative client.</param>
/// <param name="serviceBusSenderProvider">The provider to use when producing messages.</param>
/// <param name="makeChannel">Behaviour to use when verifying Channels <see cref="OnMissingChannel"/>.</param>
/// <param name="bulkSendBatchSize">When sending more than one message using the MessageProducer, the max amount to send in a single transmission.</param>
[Obsolete("Please provide publication instead of OnMissingChannel. Removed in v10")]
public AzureServiceBusMessageProducer(IAdministrationClientWrapper administrationClientWrapper, IServiceBusSenderProvider serviceBusSenderProvider, OnMissingChannel makeChannel = OnMissingChannel.Create, int bulkSendBatchSize = 10)
{
_administrationClientWrapper = administrationClientWrapper;
_serviceBusSenderProvider = serviceBusSenderProvider;
_makeChannel = makeChannel;
_publication = new AzureServiceBusPublication() { MakeChannels = makeChannel };
_bulkSendBatchSize = bulkSendBatchSize;
}

/// <summary>
/// An Azure Service Bus Message producer <see cref="IAmAMessageProducer"/>
/// </summary>
/// <param name="administrationClientWrapper">The administrative client.</param>
/// <param name="serviceBusSenderProvider">The provider to use when producing messages.</param>
/// <param name="publication">The Service Bus publication settings</param>
/// <param name="bulkSendBatchSize">When sending more than one message using the MessageProducer, the max amount to send in a single transmission.</param>
public AzureServiceBusMessageProducer(IAdministrationClientWrapper administrationClientWrapper, IServiceBusSenderProvider serviceBusSenderProvider, AzureServiceBusPublication publication, int bulkSendBatchSize = 10)
{
_administrationClientWrapper = administrationClientWrapper;
_serviceBusSenderProvider = serviceBusSenderProvider;
_publication = publication;
_bulkSendBatchSize = bulkSendBatchSize;
}

Expand Down Expand Up @@ -258,23 +275,23 @@ private ServiceBusMessage ConvertToServiceBusMessage(Message message)

private void EnsureTopicExists(string topic)
{
if (_topicCreated || _makeChannel.Equals(OnMissingChannel.Assume))
if (_topicCreated || _publication.MakeChannels.Equals(OnMissingChannel.Assume))
return;

try
{
if (_administrationClientWrapper.TopicExists(topic))
if (_administrationClientWrapper.TopicOrQueueExists(topic, _publication.UseServiceBusQueue))
{
_topicCreated = true;
return;
}

if (_makeChannel.Equals(OnMissingChannel.Validate))
if (_publication.MakeChannels.Equals(OnMissingChannel.Validate))
{
throw new ChannelFailureException($"Topic {topic} does not exist and missing channel mode set to Validate.");
}

_administrationClientWrapper.CreateTopic(topic);
_administrationClientWrapper.CreateChannel(topic, _publication.UseServiceBusQueue);
_topicCreated = true;
}
catch (Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static AzureServiceBusMessageProducer Get(
var nameSpaceManagerWrapper = new AdministrationClientWrapper(clientProvider);
var topicClientProvider = new ServiceBusSenderProvider(clientProvider);

return new AzureServiceBusMessageProducer(nameSpaceManagerWrapper, topicClientProvider, asbPublication.MakeChannels, bulkSendBatchSize);
return new AzureServiceBusMessageProducer(nameSpaceManagerWrapper, topicClientProvider, asbPublication, bulkSendBatchSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,10 @@
public class AzureServiceBusPublication : Publication
{
//TODO: Placeholder for producer specific properties if required

/// <summary>
/// Use a Service Bus Queue instead of a Topic
/// </summary>
public bool UseServiceBusQueue = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,10 @@ public class AzureServiceBusSubscriptionConfiguration
/// A Sql Filter to apply to the subscription
/// </summary>
public string SqlFilter = String.Empty;

/// <summary>
/// Use a Service Bus Queue instead of a Topic
/// </summary>
public bool UseServiceBusQueue = false;
}
}
Loading
Loading