Skip to content

Commit

Permalink
tests and refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
jimwashbrook committed Dec 13, 2024
1 parent bf1ed9c commit 19c7884
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 53 deletions.
13 changes: 8 additions & 5 deletions src/Dfe.PlanTech.Application/Background/BackgroundTaskQueue.cs
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
using System.Threading.Channels;
using Dfe.PlanTech.Domain.Background;
using Microsoft.Extensions.Options;

namespace Dfe.PlanTech.Application.Background;

public class BackgroundTaskQueue : IBackgroundTaskQueue
/// <inheritdoc cref="IBackgroundTaskQueue" />
public class BackgroundTaskQueue(IOptions<BackgroundTaskQueueOptions> options) : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, Task>> _queue = Channel.CreateBounded<Func<CancellationToken, Task>>(new BoundedChannelOptions(10)
{
FullMode = BoundedChannelFullMode.Wait
});
private readonly Channel<Func<CancellationToken, Task>> _queue = Channel.CreateBounded<Func<CancellationToken, Task>>(CreateChannelOptions(options.Value));

/// <inheritdoc cref="IBackgroundTaskQueue" />
public async Task QueueBackgroundWorkItemAsync(Func<CancellationToken, Task> workItem)
{
ArgumentNullException.ThrowIfNull(workItem);
await _queue.Writer.WriteAsync(workItem);
}

/// <inheritdoc cref="IBackgroundTaskQueue" />
public async Task<Func<CancellationToken, Task>> DequeueAsync(
CancellationToken cancellationToken)
{
var workItem = await _queue.Reader.ReadAsync(cancellationToken);

return workItem;
}

private static BoundedChannelOptions CreateChannelOptions(BackgroundTaskQueueOptions options) => new(options.MaxQueueSize) { FullMode = options.FullMode };
}
10 changes: 10 additions & 0 deletions src/Dfe.PlanTech.Domain/Background/BackgroundTaskQueueOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Threading.Channels;

namespace Dfe.PlanTech.Domain.Background;

/// <summary>
/// Options for <see cref="IBackgroundTaskQueue"/>
/// </summary>
/// <param name="MaxQueueSize">Maximum number of tasks that can be enqueued before the queue is full. Defaults to 10.</param>
/// <param name="FullMode">What to do when the queue is full. Defaults to wait. See <see cref="BoundedChannelFullMode" /> for more details.</param>
public record BackgroundTaskQueueOptions(int MaxQueueSize = 10, BoundedChannelFullMode FullMode = BoundedChannelFullMode.Wait);
24 changes: 24 additions & 0 deletions src/Dfe.PlanTech.Domain/Background/IBackgroundTaskQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace Dfe.PlanTech.Domain.Background;

/// <summary>
/// Queue for tasks to be ran in background
/// </summary>
public interface IBackgroundTaskQueue
{
/// <summary>
/// Add an async operation to the queue for background processing.
/// </summary>
/// <param name="workItem"></param>
/// <returns></returns>
Task QueueBackgroundWorkItemAsync(Func<CancellationToken, Task> workItem);

/// <summary>
/// Removes an item from the queue
/// </summary>
/// <remarks>
/// Will wait until an item exists
/// </remarks>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken);
}
8 changes: 0 additions & 8 deletions src/Dfe.PlanTech.Domain/Background/IBackgroundTaskService.cs

This file was deleted.

58 changes: 58 additions & 0 deletions src/Dfe.PlanTech.Web/Background/BackgroundTaskHostedService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using Dfe.PlanTech.Domain.Background;

namespace Dfe.PlanTech.Web.Background;

/// <summary>
/// Reads tasks from a <see cref="IBackgroundTaskQueue"/>, and runs them.
/// </summary>
/// <param name="logger"></param>
/// <param name="taskQueue"></param>
public class BackgroundTaskHostedService(ILogger<BackgroundTaskHostedService> logger, IBackgroundTaskQueue taskQueue) : BackgroundService
{
/// <summary>
/// Starts processing the queue
/// </summary>
/// <param name="stoppingToken"></param>
/// <returns></returns>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("Starting processing background tasks");
await BackgroundProcessing(stoppingToken);
}

/// <summary>
/// Processes the queue in a loop. Waits for a task to exist in the queue, reads it, and runs it.
/// </summary>
/// <param name="stoppingToken"></param>
/// <returns></returns>
private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var workItem = await taskQueue.DequeueAsync(stoppingToken);

logger.LogInformation("Read item from the queue");

try
{
await workItem(stoppingToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}

/// <summary>
/// Stops procesing of the queue
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public override async Task StopAsync(CancellationToken cancellationToken)
{
logger.LogInformation("Stopping processing background tasks");

await base.StopAsync(cancellationToken);
}
}
39 changes: 0 additions & 39 deletions src/Dfe.PlanTech.Web/Background/BackgroundTaskService.cs

This file was deleted.

2 changes: 1 addition & 1 deletion src/Dfe.PlanTech.Web/ProgramExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public static IServiceCollection AddRedisServices(this IServiceCollection servic

services.AddSingleton<IRedisDependencyManager, RedisDependencyManager>();
services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
services.AddHostedService<QueuedHostedService>();
services.AddHostedService<BackgroundTaskHostedService>();

return services;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
using Dfe.PlanTech.Application.Background;
using Dfe.PlanTech.Domain.Background;
using Dfe.PlanTech.Web.Background;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using NSubstitute;
using Xunit;

namespace Dfe.PlanTech.Web.UnitTests.Background;
public class BackgroundTaskHostedServiceTests
{
private readonly ILogger<BackgroundTaskHostedService> logger = Substitute.For<ILogger<BackgroundTaskHostedService>>();
private readonly IOptions<BackgroundTaskQueueOptions> options = Substitute.For<IOptions<BackgroundTaskQueueOptions>>();
private readonly BackgroundTaskQueue _taskQueue;
private readonly BackgroundTaskHostedService _service;
private string _mockResult = "";
private readonly string _expectedResult = "Test has ran";

public BackgroundTaskHostedServiceTests()
{
options.Value.Returns(new BackgroundTaskQueueOptions());
_taskQueue = new BackgroundTaskQueue(options);
_service = new BackgroundTaskHostedService(logger, _taskQueue);
}

[Fact]
public async Task Should_Read_From_Queue()
{
var cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.CancelAfter(2000);
var cancellationToken = cancellationTokenSource.Token;

await Task.WhenAll(_taskQueue.QueueBackgroundWorkItemAsync(ct =>
{
_mockResult = _expectedResult;
return Task.CompletedTask;
}),
_service.StartAsync(cancellationToken));

Assert.Equal(_expectedResult, _mockResult);
cancellationTokenSource.Dispose();

var loggedMessages = logger.ReceivedLogMessages().ToArray();
Assert.Contains(loggedMessages, message => message.Message.Equals("Starting processing background tasks") && message.LogLevel == LogLevel.Information);
Assert.Contains(loggedMessages, message => message.Message.Equals("Read item from the queue") && message.LogLevel == LogLevel.Information);
}

[Fact]
public async Task BackgroundProcessing_ShouldLogError_WhenWorkItemFails()
{
var cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.CancelAfter(2000);
var cancellationToken = cancellationTokenSource.Token;

await Task.WhenAll(_taskQueue.QueueBackgroundWorkItemAsync(ct =>
{
throw new Exception("An error occurred with the task");
}),
_service.StartAsync(cancellationToken));
cancellationTokenSource.Dispose();

var loggedMessages = logger.ReceivedLogMessages().ToArray();

Assert.Contains(loggedMessages, message => message.Message.Equals("Starting processing background tasks") && message.LogLevel == LogLevel.Information);
Assert.Contains(loggedMessages, message => message.Message.Equals("Read item from the queue") && message.LogLevel == LogLevel.Information);
Assert.Contains(loggedMessages, message => message.Message.StartsWith("Error occurred executing") && message.LogLevel == LogLevel.Error);
}

[Fact]
public async Task StopAsync_ShouldLogStoppingMessage()
{
// Arrange
var cancellationToken = new CancellationTokenSource().Token;

// Act
await _service.StopAsync(cancellationToken);

// Assert
Assert.Contains(logger.ReceivedLogMessages(), message => message.Message.Equals("Stopping processing background tasks") && message.LogLevel == LogLevel.Information);
}
}

0 comments on commit 19c7884

Please sign in to comment.