Skip to content

Commit

Permalink
Merge pull request #463 from DFE-Digital/feat/add-error-to-deadletter…
Browse files Browse the repository at this point in the history
…-message

feat: add error message to deadletter queue
  • Loading branch information
jimwashbrook authored Jan 18, 2024
2 parents 656c871 + aa34736 commit 96f39d9
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 30 deletions.
22 changes: 11 additions & 11 deletions src/Dfe.PlanTech.AzureFunctions/Functions/QueueReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ public QueueReceiver(ILoggerFactory loggerFactory, CmsDbContext db, JsonToEntity
}

[Function("QueueReceiver")]
public async Task QueueReceiverDbWriter([ServiceBusTrigger("contentful", IsBatched = true)] ServiceBusReceivedMessage[] messages, ServiceBusMessageActions messageActions)
public async Task QueueReceiverDbWriter([ServiceBusTrigger("contentful", IsBatched = true)] ServiceBusReceivedMessage[] messages, ServiceBusMessageActions messageActions, CancellationToken cancellationToken)
{
Logger.LogInformation("Queue Receiver -> Db Writer started. Processing {msgCount} messages", messages.Length);

foreach (ServiceBusReceivedMessage message in messages)
{
await ProcessMessage(message, messageActions);
await ProcessMessage(message, messageActions, cancellationToken);
}
}

private async Task ProcessMessage(ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions)
private async Task ProcessMessage(ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions, CancellationToken cancellationToken)
{
try
{
Expand All @@ -47,7 +47,7 @@ private async Task ProcessMessage(ServiceBusReceivedMessage message, ServiceBusM
Logger.LogInformation("Processing {text}", text);

ContentComponentDbEntity mapped = _mappers.ToEntity(text);
ContentComponentDbEntity? existing = await GetExistingDbEntity(mapped);
ContentComponentDbEntity? existing = await GetExistingDbEntity(mapped, cancellationToken);

if (existing != null)
{
Expand Down Expand Up @@ -81,7 +81,7 @@ private async Task ProcessMessage(ServiceBusReceivedMessage message, ServiceBusM
throw new CmsEventException(string.Format("CMS Event \"{0}\" not implemented", cmsEvent));
}

long rowsChanged = await UpsertEntityInDatabase(mapped, existing);
long rowsChanged = await UpsertEntityInDatabase(mapped, existing, cancellationToken);

if (rowsChanged == 0L)
{
Expand All @@ -92,23 +92,23 @@ private async Task ProcessMessage(ServiceBusReceivedMessage message, ServiceBusM
Logger.LogInformation($"Updated {rowsChanged} rows in the database");
}

await messageActions.CompleteMessageAsync(message);
await messageActions.CompleteMessageAsync(message, cancellationToken);
}
catch (Exception ex)
{
Logger.LogError(ex.Message);
await messageActions.DeadLetterMessageAsync(message);
await messageActions.DeadLetterMessageAsync(message, null, ex.Message, ex.StackTrace, cancellationToken);
}
}

private async Task<ContentComponentDbEntity?> GetExistingDbEntity(ContentComponentDbEntity entity)
private async Task<ContentComponentDbEntity?> GetExistingDbEntity(ContentComponentDbEntity entity, CancellationToken cancellationToken)
{
var model = _db.Model.FindEntityType(entity.GetType()) ?? throw new Exception($"Could not find model in database for {entity.GetType()}");

var dbSet = GetIQueryableForEntity(model);

var found = await dbSet.IgnoreAutoIncludes()
.FirstOrDefaultAsync(existing => existing.Id == entity.Id);
.FirstOrDefaultAsync(existing => existing.Id == entity.Id, cancellationToken);

return found ?? null;
}
Expand All @@ -125,7 +125,7 @@ private static string GetCmsEvent(string subject)
return subject.AsSpan()[(subject.LastIndexOf('.') + 1)..].ToString();
}

private async Task<long> UpsertEntityInDatabase(ContentComponentDbEntity entity, ContentComponentDbEntity? existing)
private async Task<long> UpsertEntityInDatabase(ContentComponentDbEntity entity, ContentComponentDbEntity? existing, CancellationToken cancellationToken)
{
if (existing == null)
{
Expand All @@ -136,7 +136,7 @@ private async Task<long> UpsertEntityInDatabase(ContentComponentDbEntity entity,
UpdateProperties(entity, existing);
}

return await _db.SaveChangesAsync();
return await _db.SaveChangesAsync(cancellationToken);
}

private void UpdateProperties(ContentComponentDbEntity entity, ContentComponentDbEntity existing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Dfe.PlanTech.Domain.Questionnaire.Models;
using Dfe.PlanTech.Infrastructure.Application.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using NSubstitute.ExceptionExtensions;

Expand All @@ -30,6 +31,7 @@ public class GetPageQueryTests
private readonly ILogger<GetPageQuery> _logger = Substitute.For<ILogger<GetPageQuery>>();
private readonly IMapper _mapperSubstitute = Substitute.For<IMapper>();
private readonly IQuestionnaireCacher _questionnaireCacherSubstitute = Substitute.For<IQuestionnaireCacher>();
private readonly GetPageFromDbQuery _getPageFromDbQuery;

private readonly List<Page> _pages = new() {
new Page(){
Expand Down Expand Up @@ -175,6 +177,7 @@ public class GetPageQueryTests

public GetPageQueryTests()
{
_getPageFromDbQuery = new GetPageFromDbQuery(_cmsDbSubstitute, new NullLogger<GetPageFromDbQuery>(), _mapperSubstitute, new[] { _getPageChildrenQuery });
_cmsDbSubstitute.ToListAsync(Arg.Any<IQueryable<SectionDbEntity>>())
.Returns(callinfo =>
{
Expand Down Expand Up @@ -282,7 +285,7 @@ private void SetupRepository()
}

private IGetPageQuery CreateGetPageQuery()
=> new GetPageQuery(_cmsDbSubstitute, _logger, _mapperSubstitute, _questionnaireCacherSubstitute, _repoSubstitute, new[] { _getPageChildrenQuery });
=> new GetPageQuery(_getPageFromDbQuery, _logger, _questionnaireCacherSubstitute, _repoSubstitute);

private void SetupQuestionnaireCacher()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public async Task QueueReceiverDbWriter_Should_Execute_Successfully()

ServiceBusReceivedMessage serviceBusReceivedMessage = ServiceBusReceivedMessage.FromAmqpMessage(serviceBusMessage.GetRawAmqpMessage(), BinaryData.FromBytes(Encoding.UTF8.GetBytes(serviceBusReceivedMessageMock.LockToken)));

await _queueReceiver.QueueReceiverDbWriter(new ServiceBusReceivedMessage[] { serviceBusReceivedMessage }, serviceBusMessageActionsMock);
await _queueReceiver.QueueReceiverDbWriter(new ServiceBusReceivedMessage[] { serviceBusReceivedMessage }, serviceBusMessageActionsMock, CancellationToken.None);

await serviceBusMessageActionsMock.Received().CompleteMessageAsync(Arg.Any<ServiceBusReceivedMessage>());
await _cmsDbContextMock.ReceivedWithAnyArgs(1).SaveChangesAsync(Arg.Any<CancellationToken>());
Expand Down Expand Up @@ -130,9 +130,9 @@ public async Task QueueReceiverDbWriter_Should_DeadLetter_Failed_Operation()

ServiceBusReceivedMessage serviceBusReceivedMessage = ServiceBusReceivedMessage.FromAmqpMessage(serviceBusMessage.GetRawAmqpMessage(), BinaryData.FromBytes(Encoding.UTF8.GetBytes(serviceBusReceivedMessageMock.LockToken)));

await _queueReceiver.QueueReceiverDbWriter(new ServiceBusReceivedMessage[] { serviceBusReceivedMessage }, serviceBusMessageActionsMock);
await _queueReceiver.QueueReceiverDbWriter(new ServiceBusReceivedMessage[] { serviceBusReceivedMessage }, serviceBusMessageActionsMock, CancellationToken.None);

await serviceBusMessageActionsMock.Received().DeadLetterMessageAsync(Arg.Any<ServiceBusReceivedMessage>());
await serviceBusMessageActionsMock.Received().DeadLetterMessageAsync(Arg.Any<ServiceBusReceivedMessage>(), null, Arg.Any<string>(), Arg.Any<string>(), Arg.Any<CancellationToken>());
}

[Fact]
Expand All @@ -147,10 +147,11 @@ public async Task QueueReceiverDbWriter_Should_MapExistingDbEntity_To_Message()

ServiceBusReceivedMessage serviceBusReceivedMessage = ServiceBusReceivedMessage.FromAmqpMessage(serviceBusMessage.GetRawAmqpMessage(), BinaryData.FromBytes(Encoding.UTF8.GetBytes(serviceBusReceivedMessageMock.LockToken)));

await _queueReceiver.QueueReceiverDbWriter(new ServiceBusReceivedMessage[] { serviceBusReceivedMessage }, serviceBusMessageActionsMock);
await _queueReceiver.QueueReceiverDbWriter(new ServiceBusReceivedMessage[] { serviceBusReceivedMessage }, serviceBusMessageActionsMock, CancellationToken.None);

await serviceBusMessageActionsMock.Received().CompleteMessageAsync(Arg.Any<ServiceBusReceivedMessage>());
await serviceBusMessageActionsMock.Received().CompleteMessageAsync(Arg.Any<ServiceBusReceivedMessage>(), Arg.Any<CancellationToken>());
_cmsDbContextMock.ReceivedWithAnyArgs(0).Add(Arg.Any<ContentComponentDbEntity>());
await _cmsDbContextMock.ReceivedWithAnyArgs(1).SaveChangesAsync(Arg.Any<CancellationToken>());
}

[Fact]
Expand All @@ -166,9 +167,9 @@ public async Task QueueRecieverDbWriter_Should_CompleteSuccessfully_After_Archiv

ServiceBusReceivedMessage serviceBusReceivedMessage = ServiceBusReceivedMessage.FromAmqpMessage(serviceBusMessage.GetRawAmqpMessage(), BinaryData.FromBytes(Encoding.UTF8.GetBytes(serviceBusReceivedMessageMock.LockToken)));

await _queueReceiver.QueueReceiverDbWriter(new ServiceBusReceivedMessage[] { serviceBusReceivedMessage }, serviceBusMessageActionsMock);
await _queueReceiver.QueueReceiverDbWriter(new ServiceBusReceivedMessage[] { serviceBusReceivedMessage }, serviceBusMessageActionsMock, CancellationToken.None);

await serviceBusMessageActionsMock.Received().CompleteMessageAsync(Arg.Any<ServiceBusReceivedMessage>());
await serviceBusMessageActionsMock.Received().CompleteMessageAsync(Arg.Any<ServiceBusReceivedMessage>(), Arg.Any<CancellationToken>());

var added = _addedObject as ContentComponentDbEntity;
Assert.NotNull(added);
Expand All @@ -188,9 +189,9 @@ public async Task QueueRecieverDbWriter_Should_CompleteSuccessfully_After_Unarch

ServiceBusReceivedMessage serviceBusReceivedMessage = ServiceBusReceivedMessage.FromAmqpMessage(serviceBusMessage.GetRawAmqpMessage(), BinaryData.FromBytes(Encoding.UTF8.GetBytes(serviceBusReceivedMessageMock.LockToken)));

await _queueReceiver.QueueReceiverDbWriter(new ServiceBusReceivedMessage[] { serviceBusReceivedMessage }, serviceBusMessageActionsMock);
await _queueReceiver.QueueReceiverDbWriter(new ServiceBusReceivedMessage[] { serviceBusReceivedMessage }, serviceBusMessageActionsMock, CancellationToken.None);

await serviceBusMessageActionsMock.Received().CompleteMessageAsync(Arg.Any<ServiceBusReceivedMessage>());
await serviceBusMessageActionsMock.Received().CompleteMessageAsync(Arg.Any<ServiceBusReceivedMessage>(), Arg.Any<CancellationToken>());

var added = _addedObject as ContentComponentDbEntity;
Assert.NotNull(added);
Expand All @@ -210,9 +211,9 @@ public async Task QueueRecieverDbWriter_Should_CompleteSuccessfully_After_Publis

ServiceBusReceivedMessage serviceBusReceivedMessage = ServiceBusReceivedMessage.FromAmqpMessage(serviceBusMessage.GetRawAmqpMessage(), BinaryData.FromBytes(Encoding.UTF8.GetBytes(serviceBusReceivedMessageMock.LockToken)));

await _queueReceiver.QueueReceiverDbWriter(new ServiceBusReceivedMessage[] { serviceBusReceivedMessage }, serviceBusMessageActionsMock);
await _queueReceiver.QueueReceiverDbWriter(new ServiceBusReceivedMessage[] { serviceBusReceivedMessage }, serviceBusMessageActionsMock, CancellationToken.None);

await serviceBusMessageActionsMock.Received().CompleteMessageAsync(Arg.Any<ServiceBusReceivedMessage>());
await serviceBusMessageActionsMock.Received().CompleteMessageAsync(Arg.Any<ServiceBusReceivedMessage>(), Arg.Any<CancellationToken>());

var added = _addedObject as ContentComponentDbEntity;
Assert.NotNull(added);
Expand All @@ -232,9 +233,9 @@ public async Task QueueRecieverDbWriter_Should_CompleteSuccessfully_After_Unpubl

ServiceBusReceivedMessage serviceBusReceivedMessage = ServiceBusReceivedMessage.FromAmqpMessage(serviceBusMessage.GetRawAmqpMessage(), BinaryData.FromBytes(Encoding.UTF8.GetBytes(serviceBusReceivedMessageMock.LockToken)));

await _queueReceiver.QueueReceiverDbWriter(new ServiceBusReceivedMessage[] { serviceBusReceivedMessage }, serviceBusMessageActionsMock);
await _queueReceiver.QueueReceiverDbWriter(new ServiceBusReceivedMessage[] { serviceBusReceivedMessage }, serviceBusMessageActionsMock, CancellationToken.None);

await serviceBusMessageActionsMock.Received().CompleteMessageAsync(Arg.Any<ServiceBusReceivedMessage>());
await serviceBusMessageActionsMock.Received().CompleteMessageAsync(Arg.Any<ServiceBusReceivedMessage>(), Arg.Any<CancellationToken>());

var added = _addedObject as ContentComponentDbEntity;
Assert.NotNull(added);
Expand All @@ -254,9 +255,9 @@ public async Task QueueRecieverDbWriter_Should_CompleteSuccessfully_After_Delete

ServiceBusReceivedMessage serviceBusReceivedMessage = ServiceBusReceivedMessage.FromAmqpMessage(serviceBusMessage.GetRawAmqpMessage(), BinaryData.FromBytes(Encoding.UTF8.GetBytes(serviceBusReceivedMessageMock.LockToken)));

await _queueReceiver.QueueReceiverDbWriter(new ServiceBusReceivedMessage[] { serviceBusReceivedMessage }, serviceBusMessageActionsMock);
await _queueReceiver.QueueReceiverDbWriter(new ServiceBusReceivedMessage[] { serviceBusReceivedMessage }, serviceBusMessageActionsMock, CancellationToken.None);

await serviceBusMessageActionsMock.Received().CompleteMessageAsync(Arg.Any<ServiceBusReceivedMessage>());
await serviceBusMessageActionsMock.Received().CompleteMessageAsync(Arg.Any<ServiceBusReceivedMessage>(), Arg.Any<CancellationToken>());

var added = _addedObject as ContentComponentDbEntity;
Assert.NotNull(added);
Expand All @@ -274,8 +275,8 @@ public async Task QueueRecieverDbWriter_Should_DeadLetterQueue_If_ActionIsInvali

ServiceBusReceivedMessage serviceBusReceivedMessage = ServiceBusReceivedMessage.FromAmqpMessage(serviceBusMessage.GetRawAmqpMessage(), BinaryData.FromBytes(Encoding.UTF8.GetBytes(serviceBusReceivedMessageMock.LockToken)));

await _queueReceiver.QueueReceiverDbWriter(new ServiceBusReceivedMessage[] { serviceBusReceivedMessage }, serviceBusMessageActionsMock);
await _queueReceiver.QueueReceiverDbWriter(new ServiceBusReceivedMessage[] { serviceBusReceivedMessage }, serviceBusMessageActionsMock, CancellationToken.None);

await serviceBusMessageActionsMock.Received().DeadLetterMessageAsync(Arg.Any<ServiceBusReceivedMessage>());
await serviceBusMessageActionsMock.Received().DeadLetterMessageAsync(Arg.Any<ServiceBusReceivedMessage>(), null, Arg.Any<string>(), Arg.Any<string>(), Arg.Any<CancellationToken>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Dfe.PlanTech.Application.Persistence.Interfaces;
using Dfe.PlanTech.Domain.Content.Interfaces;
using Dfe.PlanTech.Domain.Content.Models;
using Dfe.PlanTech.Domain.Content.Queries;
using Dfe.PlanTech.Domain.Cookie.Interfaces;
using Dfe.PlanTech.Infrastructure.Application.Models;
using Dfe.PlanTech.Web.Controllers;
Expand All @@ -12,6 +13,7 @@
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Mvc.ViewFeatures;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Primitives;
using Microsoft.Net.Http.Headers;
using NSubstitute;
Expand All @@ -24,6 +26,7 @@ public class CookieControllerTests
private readonly ICmsDbContext _db = Substitute.For<ICmsDbContext>();
private readonly ILogger<GetPageQuery> _getPageLogger = Substitute.For<ILogger<GetPageQuery>>();
private readonly IMapper _mapper = Substitute.For<IMapper>();
private readonly IGetPageQuery _getPageFromDbQuery;

private readonly Page[] _pages = new Page[]
{
Expand All @@ -46,6 +49,11 @@ public static CookiesController CreateStrut()
};
}

public CookieControllerTests()
{
_getPageFromDbQuery = Substitute.For<GetPageFromDbQuery>(_db, new NullLogger<GetPageFromDbQuery>(), _mapper, Array.Empty<IGetPageChildrenQuery>());
}

[Theory]
[InlineData("https://localhost:8080/self-assessment")]
[InlineData("https://www.dfe.gov.uk/self-assessment")]
Expand Down Expand Up @@ -134,7 +142,7 @@ public async Task CookiesPageDisplays()
{
IQuestionnaireCacher questionnaireCacherSubstitute = Substitute.For<IQuestionnaireCacher>();
IContentRepository contentRepositorySubstitute = SetupRepositorySubstitute();
GetPageQuery _getPageQuerySubstitute = Substitute.For<GetPageQuery>(_db, _getPageLogger, _mapper, questionnaireCacherSubstitute, contentRepositorySubstitute, Array.Empty<IGetPageChildrenQuery>());
GetPageQuery _getPageQuerySubstitute = Substitute.For<GetPageQuery>(_getPageFromDbQuery, _getPageLogger, questionnaireCacherSubstitute, contentRepositorySubstitute);

CookiesController cookiesController = CreateStrut();
var result = await cookiesController.GetCookiesPage(_getPageQuerySubstitute);
Expand Down

0 comments on commit 96f39d9

Please sign in to comment.