diff --git a/src/Dfe.PlanTech.Domain/ServiceBus/Models/ServiceBusOptions.cs b/src/Dfe.PlanTech.Domain/ServiceBus/Models/ServiceBusOptions.cs
index f3c3b4785..157cbb95f 100644
--- a/src/Dfe.PlanTech.Domain/ServiceBus/Models/ServiceBusOptions.cs
+++ b/src/Dfe.PlanTech.Domain/ServiceBus/Models/ServiceBusOptions.cs
@@ -2,5 +2,8 @@ namespace Dfe.PlanTech.Domain.ServiceBus.Models;
public record ServiceBusOptions
{
- public int MessagesPerBatch { get; init; } = 10;
+ ///
+ /// Enables reading + processing of messages from the Service Bus
+ ///
+ public bool EnableQueueReading { get; init; } = true;
}
diff --git a/src/Dfe.PlanTech.Infrastructure.ServiceBus/ContentfulServiceBusProcessor.cs b/src/Dfe.PlanTech.Infrastructure.ServiceBus/ContentfulServiceBusProcessor.cs
index c1a3219c5..35701366a 100644
--- a/src/Dfe.PlanTech.Infrastructure.ServiceBus/ContentfulServiceBusProcessor.cs
+++ b/src/Dfe.PlanTech.Infrastructure.ServiceBus/ContentfulServiceBusProcessor.cs
@@ -2,11 +2,13 @@
using Azure.Messaging.ServiceBus;
using Dfe.PlanTech.Application.Persistence.Commands;
using Dfe.PlanTech.Domain.Persistence.Interfaces;
+using Dfe.PlanTech.Domain.ServiceBus.Models;
using Dfe.PlanTech.Infrastructure.ServiceBus.Results;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
namespace Dfe.PlanTech.Infrastructure.ServiceBus;
@@ -20,7 +22,8 @@ namespace Dfe.PlanTech.Infrastructure.ServiceBus;
public class ContentfulServiceBusProcessor(IAzureClientFactory processorFactory,
IServiceBusResultProcessor resultProcessor,
ILogger logger,
- IServiceScopeFactory serviceScopeFactory) : BackgroundService
+ IServiceScopeFactory serviceScopeFactory,
+ IOptions options) : BackgroundService
{
private readonly ServiceBusProcessor _processor = processorFactory.CreateClient("contentfulprocessor");
@@ -29,12 +32,17 @@ public class ContentfulServiceBusProcessor(IAzureClientFactory
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
+ if (!options.Value.EnableQueueReading)
+ {
+ logger.LogInformation("{QueueReadingProperty} is set to disabling - not enabling processing queue reading", nameof(options.Value.EnableQueueReading));
+ return;
+ }
_processor.ProcessMessageAsync += MessageHandler;
_processor.ProcessErrorAsync += ErrorHandler;
await _processor.StartProcessingAsync(stoppingToken);
- stoppingToken.Register(async () => await StopProcessingAsync());
+ stoppingToken.Register(() => StopProcessingAsync().GetAwaiter().GetResult());
}
///
diff --git a/src/Dfe.PlanTech.Infrastructure.ServiceBus/DependencyInjection.cs b/src/Dfe.PlanTech.Infrastructure.ServiceBus/DependencyInjection.cs
index e9c13fa7a..c3838d311 100644
--- a/src/Dfe.PlanTech.Infrastructure.ServiceBus/DependencyInjection.cs
+++ b/src/Dfe.PlanTech.Infrastructure.ServiceBus/DependencyInjection.cs
@@ -58,7 +58,7 @@ private static IServiceCollection AddServiceBusServices(this IServiceCollection
services.AddTransient();
services.AddTransient();
- services.AddSingleton(new ServiceBusOptions() { MessagesPerBatch = 10 });
+ services.Configure(configuration.GetSection(nameof(ServiceBusOptions)));
return services;
}
diff --git a/src/Dfe.PlanTech.Web/appsettings.Development.json b/src/Dfe.PlanTech.Web/appsettings.Development.json
index f95544f91..0ea7f52dd 100644
--- a/src/Dfe.PlanTech.Web/appsettings.Development.json
+++ b/src/Dfe.PlanTech.Web/appsettings.Development.json
@@ -18,5 +18,8 @@
"GTM": {
"Id": "GTM-MFLR9ZK",
"SiteVerificationId": "rhK9PD6EMeai5M5a3qySWTZTxwZxyzUjU2fifZ_ezjs"
+ },
+ "ServiceBusOptions": {
+ "EnableQueueReading": true
}
}
diff --git a/tests/Dfe.PlanTech.Infrastructure.ServiceBus.UnitTests/ContentfulServiceBusProcessorTests.cs b/tests/Dfe.PlanTech.Infrastructure.ServiceBus.UnitTests/ContentfulServiceBusProcessorTests.cs
index 4b030ac28..209327de4 100644
--- a/tests/Dfe.PlanTech.Infrastructure.ServiceBus.UnitTests/ContentfulServiceBusProcessorTests.cs
+++ b/tests/Dfe.PlanTech.Infrastructure.ServiceBus.UnitTests/ContentfulServiceBusProcessorTests.cs
@@ -7,6 +7,7 @@
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
using NSubstitute;
using NSubstitute.ExceptionExtensions;
@@ -21,6 +22,7 @@ public class ContentfulServiceBusProcessorTests
private readonly ILogger _logger = Substitute.For>();
private readonly IWebhookToDbCommand _webhookToDbCommand = Substitute.For();
private readonly IServiceScopeFactory _serviceScopeFactory = Substitute.For();
+ private readonly IOptions _options = Substitute.For>();
private readonly IServiceBusResultProcessor _serviceBusResultProcessor;
private readonly ContentfulServiceBusProcessor _contentfulServiceBusProcessor;
@@ -36,7 +38,19 @@ public ContentfulServiceBusProcessorTests()
_serviceBusResultProcessor = Substitute.For();
_processorFactory.CreateClient("contentfulprocessor").Returns(_serviceBusProcessor);
- _contentfulServiceBusProcessor = new ContentfulServiceBusProcessor(_processorFactory, _serviceBusResultProcessor, _logger, _serviceScopeFactory);
+ _options.Value.Returns(new ServiceBusOptions() { EnableQueueReading = true });
+ _contentfulServiceBusProcessor = new ContentfulServiceBusProcessor(_processorFactory, _serviceBusResultProcessor, _logger, _serviceScopeFactory, _options);
+ }
+
+ [Fact]
+ public async Task EnableQueueReading_Should_PreventQueueProcessing_When_False()
+ {
+ _options.Value.Returns(new ServiceBusOptions() { EnableQueueReading = false });
+ var contentfulServiceBusProcessor = new ContentfulServiceBusProcessor(_processorFactory, _serviceBusResultProcessor, _logger, _serviceScopeFactory, _options);
+
+ await contentfulServiceBusProcessor.InvokeNonPublicAsyncMethod("ExecuteAsync", [CancellationToken.None]);
+
+ Assert.Empty(_serviceBusProcessor.ReceivedCalls());
}
[Fact]