diff --git a/src/Dfe.PlanTech.Domain/ServiceBus/Models/ServiceBusOptions.cs b/src/Dfe.PlanTech.Domain/ServiceBus/Models/ServiceBusOptions.cs index f3c3b4785..157cbb95f 100644 --- a/src/Dfe.PlanTech.Domain/ServiceBus/Models/ServiceBusOptions.cs +++ b/src/Dfe.PlanTech.Domain/ServiceBus/Models/ServiceBusOptions.cs @@ -2,5 +2,8 @@ namespace Dfe.PlanTech.Domain.ServiceBus.Models; public record ServiceBusOptions { - public int MessagesPerBatch { get; init; } = 10; + /// + /// Enables reading + processing of messages from the Service Bus + /// + public bool EnableQueueReading { get; init; } = true; } diff --git a/src/Dfe.PlanTech.Infrastructure.ServiceBus/ContentfulServiceBusProcessor.cs b/src/Dfe.PlanTech.Infrastructure.ServiceBus/ContentfulServiceBusProcessor.cs index c1a3219c5..35701366a 100644 --- a/src/Dfe.PlanTech.Infrastructure.ServiceBus/ContentfulServiceBusProcessor.cs +++ b/src/Dfe.PlanTech.Infrastructure.ServiceBus/ContentfulServiceBusProcessor.cs @@ -2,11 +2,13 @@ using Azure.Messaging.ServiceBus; using Dfe.PlanTech.Application.Persistence.Commands; using Dfe.PlanTech.Domain.Persistence.Interfaces; +using Dfe.PlanTech.Domain.ServiceBus.Models; using Dfe.PlanTech.Infrastructure.ServiceBus.Results; using Microsoft.Extensions.Azure; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; namespace Dfe.PlanTech.Infrastructure.ServiceBus; @@ -20,7 +22,8 @@ namespace Dfe.PlanTech.Infrastructure.ServiceBus; public class ContentfulServiceBusProcessor(IAzureClientFactory processorFactory, IServiceBusResultProcessor resultProcessor, ILogger logger, - IServiceScopeFactory serviceScopeFactory) : BackgroundService + IServiceScopeFactory serviceScopeFactory, + IOptions options) : BackgroundService { private readonly ServiceBusProcessor _processor = processorFactory.CreateClient("contentfulprocessor"); @@ -29,12 +32,17 @@ public class ContentfulServiceBusProcessor(IAzureClientFactory protected override async Task ExecuteAsync(CancellationToken stoppingToken) { + if (!options.Value.EnableQueueReading) + { + logger.LogInformation("{QueueReadingProperty} is set to disabling - not enabling processing queue reading", nameof(options.Value.EnableQueueReading)); + return; + } _processor.ProcessMessageAsync += MessageHandler; _processor.ProcessErrorAsync += ErrorHandler; await _processor.StartProcessingAsync(stoppingToken); - stoppingToken.Register(async () => await StopProcessingAsync()); + stoppingToken.Register(() => StopProcessingAsync().GetAwaiter().GetResult()); } /// diff --git a/src/Dfe.PlanTech.Infrastructure.ServiceBus/DependencyInjection.cs b/src/Dfe.PlanTech.Infrastructure.ServiceBus/DependencyInjection.cs index e9c13fa7a..c3838d311 100644 --- a/src/Dfe.PlanTech.Infrastructure.ServiceBus/DependencyInjection.cs +++ b/src/Dfe.PlanTech.Infrastructure.ServiceBus/DependencyInjection.cs @@ -58,7 +58,7 @@ private static IServiceCollection AddServiceBusServices(this IServiceCollection services.AddTransient(); services.AddTransient(); - services.AddSingleton(new ServiceBusOptions() { MessagesPerBatch = 10 }); + services.Configure(configuration.GetSection(nameof(ServiceBusOptions))); return services; } diff --git a/src/Dfe.PlanTech.Web/appsettings.Development.json b/src/Dfe.PlanTech.Web/appsettings.Development.json index f95544f91..0ea7f52dd 100644 --- a/src/Dfe.PlanTech.Web/appsettings.Development.json +++ b/src/Dfe.PlanTech.Web/appsettings.Development.json @@ -18,5 +18,8 @@ "GTM": { "Id": "GTM-MFLR9ZK", "SiteVerificationId": "rhK9PD6EMeai5M5a3qySWTZTxwZxyzUjU2fifZ_ezjs" + }, + "ServiceBusOptions": { + "EnableQueueReading": true } } diff --git a/tests/Dfe.PlanTech.Infrastructure.ServiceBus.UnitTests/ContentfulServiceBusProcessorTests.cs b/tests/Dfe.PlanTech.Infrastructure.ServiceBus.UnitTests/ContentfulServiceBusProcessorTests.cs index 4b030ac28..209327de4 100644 --- a/tests/Dfe.PlanTech.Infrastructure.ServiceBus.UnitTests/ContentfulServiceBusProcessorTests.cs +++ b/tests/Dfe.PlanTech.Infrastructure.ServiceBus.UnitTests/ContentfulServiceBusProcessorTests.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.Azure; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using NSubstitute; using NSubstitute.ExceptionExtensions; @@ -21,6 +22,7 @@ public class ContentfulServiceBusProcessorTests private readonly ILogger _logger = Substitute.For>(); private readonly IWebhookToDbCommand _webhookToDbCommand = Substitute.For(); private readonly IServiceScopeFactory _serviceScopeFactory = Substitute.For(); + private readonly IOptions _options = Substitute.For>(); private readonly IServiceBusResultProcessor _serviceBusResultProcessor; private readonly ContentfulServiceBusProcessor _contentfulServiceBusProcessor; @@ -36,7 +38,19 @@ public ContentfulServiceBusProcessorTests() _serviceBusResultProcessor = Substitute.For(); _processorFactory.CreateClient("contentfulprocessor").Returns(_serviceBusProcessor); - _contentfulServiceBusProcessor = new ContentfulServiceBusProcessor(_processorFactory, _serviceBusResultProcessor, _logger, _serviceScopeFactory); + _options.Value.Returns(new ServiceBusOptions() { EnableQueueReading = true }); + _contentfulServiceBusProcessor = new ContentfulServiceBusProcessor(_processorFactory, _serviceBusResultProcessor, _logger, _serviceScopeFactory, _options); + } + + [Fact] + public async Task EnableQueueReading_Should_PreventQueueProcessing_When_False() + { + _options.Value.Returns(new ServiceBusOptions() { EnableQueueReading = false }); + var contentfulServiceBusProcessor = new ContentfulServiceBusProcessor(_processorFactory, _serviceBusResultProcessor, _logger, _serviceScopeFactory, _options); + + await contentfulServiceBusProcessor.InvokeNonPublicAsyncMethod("ExecuteAsync", [CancellationToken.None]); + + Assert.Empty(_serviceBusProcessor.ReceivedCalls()); } [Fact]