Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add option to disable Service Bus processing #936

Merged
merged 2 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@ namespace Dfe.PlanTech.Domain.ServiceBus.Models;

public record ServiceBusOptions
{
public int MessagesPerBatch { get; init; } = 10;
/// <summary>
/// Enables reading + processing of messages from the Service Bus
/// </summary>
public bool EnableQueueReading { get; init; } = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
using Azure.Messaging.ServiceBus;
using Dfe.PlanTech.Application.Persistence.Commands;
using Dfe.PlanTech.Domain.Persistence.Interfaces;
using Dfe.PlanTech.Domain.ServiceBus.Models;
using Dfe.PlanTech.Infrastructure.ServiceBus.Results;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Dfe.PlanTech.Infrastructure.ServiceBus;

Expand All @@ -20,7 +22,8 @@ namespace Dfe.PlanTech.Infrastructure.ServiceBus;
public class ContentfulServiceBusProcessor(IAzureClientFactory<ServiceBusProcessor> processorFactory,
IServiceBusResultProcessor resultProcessor,
ILogger<ContentfulServiceBusProcessor> logger,
IServiceScopeFactory serviceScopeFactory) : BackgroundService
IServiceScopeFactory serviceScopeFactory,
IOptions<ServiceBusOptions> options) : BackgroundService
{
private readonly ServiceBusProcessor _processor = processorFactory.CreateClient("contentfulprocessor");

Expand All @@ -29,12 +32,17 @@ public class ContentfulServiceBusProcessor(IAzureClientFactory<ServiceBusProcess
/// </summary>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!options.Value.EnableQueueReading)
{
logger.LogInformation("{QueueReadingProperty} is set to disabling - not enabling processing queue reading", nameof(options.Value.EnableQueueReading));
return;
}
_processor.ProcessMessageAsync += MessageHandler;
_processor.ProcessErrorAsync += ErrorHandler;

await _processor.StartProcessingAsync(stoppingToken);

stoppingToken.Register(async () => await StopProcessingAsync());
stoppingToken.Register(() => StopProcessingAsync().GetAwaiter().GetResult());
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private static IServiceCollection AddServiceBusServices(this IServiceCollection
services.AddTransient<IQueueWriter, QueueWriter>();
services.AddTransient<IWriteCmsWebhookToQueueCommand, WriteCmsWebhookToQueueCommand>();

services.AddSingleton(new ServiceBusOptions() { MessagesPerBatch = 10 });
services.Configure<ServiceBusOptions>(configuration.GetSection(nameof(ServiceBusOptions)));
return services;
}

Expand Down
3 changes: 3 additions & 0 deletions src/Dfe.PlanTech.Web/appsettings.Development.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,8 @@
"GTM": {
"Id": "GTM-MFLR9ZK",
"SiteVerificationId": "rhK9PD6EMeai5M5a3qySWTZTxwZxyzUjU2fifZ_ezjs"
},
"ServiceBusOptions": {
"EnableQueueReading": true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using NSubstitute;
using NSubstitute.ExceptionExtensions;

Expand All @@ -21,6 +22,7 @@ public class ContentfulServiceBusProcessorTests
private readonly ILogger<ContentfulServiceBusProcessor> _logger = Substitute.For<ILogger<ContentfulServiceBusProcessor>>();
private readonly IWebhookToDbCommand _webhookToDbCommand = Substitute.For<IWebhookToDbCommand>();
private readonly IServiceScopeFactory _serviceScopeFactory = Substitute.For<IServiceScopeFactory>();
private readonly IOptions<ServiceBusOptions> _options = Substitute.For<IOptions<ServiceBusOptions>>();
private readonly IServiceBusResultProcessor _serviceBusResultProcessor;
private readonly ContentfulServiceBusProcessor _contentfulServiceBusProcessor;

Expand All @@ -36,7 +38,19 @@ public ContentfulServiceBusProcessorTests()

_serviceBusResultProcessor = Substitute.For<IServiceBusResultProcessor>();
_processorFactory.CreateClient("contentfulprocessor").Returns(_serviceBusProcessor);
_contentfulServiceBusProcessor = new ContentfulServiceBusProcessor(_processorFactory, _serviceBusResultProcessor, _logger, _serviceScopeFactory);
_options.Value.Returns(new ServiceBusOptions() { EnableQueueReading = true });
_contentfulServiceBusProcessor = new ContentfulServiceBusProcessor(_processorFactory, _serviceBusResultProcessor, _logger, _serviceScopeFactory, _options);
}

[Fact]
public async Task EnableQueueReading_Should_PreventQueueProcessing_When_False()
{
_options.Value.Returns(new ServiceBusOptions() { EnableQueueReading = false });
var contentfulServiceBusProcessor = new ContentfulServiceBusProcessor(_processorFactory, _serviceBusResultProcessor, _logger, _serviceScopeFactory, _options);

await contentfulServiceBusProcessor.InvokeNonPublicAsyncMethod("ExecuteAsync", [CancellationToken.None]);

Assert.Empty(_serviceBusProcessor.ReceivedCalls());
}

[Fact]
Expand Down
Loading