Skip to content

Commit

Permalink
Make file upload notification callback async and update doc comments (#…
Browse files Browse the repository at this point in the history
…3224)

* Make file upload notification callback async and update doc comments

* Update doc comments

* Make error processor return Task to indicate async operations are allowed

* PR feedback

* Update tests

* Add ErrorProcessor test
  • Loading branch information
David R. Williamson authored Mar 31, 2023
1 parent 737159e commit 2816428
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 146 deletions.
13 changes: 8 additions & 5 deletions e2e/Tests/helpers/StorageContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,26 @@ namespace Microsoft.Azure.Devices.E2ETests.Helpers
public class StorageContainer : IDisposable
{
private bool _disposed;
private readonly bool _deleteOnDispose;

public string ContainerName { get; }
public Uri Uri { get; private set; }
public Uri SasUri { get; private set; }
public CloudBlobContainer CloudBlobContainer { get; private set; }

private StorageContainer(string containerName)
private StorageContainer(string containerName, bool deleteOnDispose)
{
if (string.IsNullOrWhiteSpace(containerName))
{
containerName = Guid.NewGuid().ToString();
}

_deleteOnDispose = deleteOnDispose;
ContainerName = containerName;
}

public static async Task<StorageContainer> GetInstanceAsync(string containerName = null)
public static async Task<StorageContainer> GetInstanceAsync(string containerName = null, bool deleteOnDispose = true)
{
var sc = new StorageContainer(containerName);
var sc = new StorageContainer(containerName, deleteOnDispose);
await sc.InitializeAsync().ConfigureAwait(false);
return sc;
}
Expand Down Expand Up @@ -125,7 +126,9 @@ protected virtual void Dispose(bool disposing)
return;
}

if (disposing && CloudBlobContainer != null)
if (disposing
&& CloudBlobContainer != null
&& _deleteOnDispose)
{
CloudBlobContainer.Delete();
CloudBlobContainer = null;
Expand Down
218 changes: 105 additions & 113 deletions e2e/Tests/iothub/service/FileUploadNotificationE2ETest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.E2ETests.Helpers;
using Microsoft.Azure.Storage.Blob;
Expand All @@ -27,74 +31,93 @@ public class FileUploadNotificationE2ETest : E2EMsTestBase
// choosing to Abandon rather than Complete because each test process may receive file upload
// notifications that another test was looking for. By abandoning each received notification,
// the service makes it available for redelivery to other open receivers.
private readonly AcknowledgementType _acknowledgementType = AcknowledgementType.Abandon;
private readonly AcknowledgementType _defaultAcknowledgementType = AcknowledgementType.Abandon;

[TestMethod]
[Timeout(LongRunningTestTimeoutMilliseconds)]
[DataRow(IotHubTransportProtocol.Tcp)]
[DataRow(IotHubTransportProtocol.WebSocket)]
public async Task FileUploadNotification_Operation(IotHubTransportProtocol protocol)
[DataRow(IotHubTransportProtocol.Tcp, 1, false)]
[DataRow(IotHubTransportProtocol.Tcp, 2, false)]
[DataRow(IotHubTransportProtocol.Tcp, 1, true)]
[DataRow(IotHubTransportProtocol.WebSocket, 1, false)]
[DataRow(IotHubTransportProtocol.WebSocket, 1, true)]
public async Task FileUploadNotification_FileUploadNotificationProcessor_ReceivesNotifications(IotHubTransportProtocol protocol, int filesToUpload, bool shouldReconnect)
{
var options = new IotHubServiceClientOptions
{
Protocol = protocol
};

using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(TestTimeoutMilliseconds));
using var serviceClient = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString, options);
using StorageContainer storage = await StorageContainer.GetInstanceAsync("fileupload", false).ConfigureAwait(false);
using var fileNotification = new SemaphoreSlim(1, 1);

var counter = new FileUploadNotificationCounter();
Func<FileUploadNotification, AcknowledgementType> OnFileUploadNotificationReceived = (fileUploadNotification) =>
try
{
counter.FileUploadNotificationsReceived++;
return _acknowledgementType;
};

serviceClient.FileUploadNotifications.FileUploadNotificationProcessor = OnFileUploadNotificationReceived;

await serviceClient.FileUploadNotifications.OpenAsync().ConfigureAwait(false);
await UploadFile().ConfigureAwait(false);
await WaitForFileUploadNotification(counter, 1);
await serviceClient.FileUploadNotifications.CloseAsync().ConfigureAwait(false);
}
var files = new Dictionary<string, bool>(filesToUpload);
var allFilesFound = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
async Task<AcknowledgementType> OnFileUploadNotificationReceived(FileUploadNotification fileUploadNotification)
{
string fileName = fileUploadNotification.BlobName.Substring(fileUploadNotification.BlobName.IndexOf('/') + 1);
if (!files.ContainsKey(fileName))
{
// Notification does not belong to this test
VerboseTestLogger.WriteLine($"Received notification for unrelated file {fileName}.");
return _defaultAcknowledgementType;
}

[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
[DataRow(IotHubTransportProtocol.Tcp)]
[DataRow(IotHubTransportProtocol.WebSocket)]
public async Task FileUploadNotification_Operation_OpenCloseOpen(IotHubTransportProtocol protocol)
{
var options = new IotHubServiceClientOptions
{
Protocol = protocol
};
VerboseTestLogger.WriteLine($"Received notification for {fileName}.");
if (!files[fileName])
{
files[fileName] = true;
CloudBlob blob = storage.CloudBlobContainer.GetBlobReference(fileUploadNotification.BlobName);
VerboseTestLogger.WriteLine($"Deleting blob {fileUploadNotification.BlobName}...");
await blob.DeleteIfExistsAsync(cts.Token).ConfigureAwait(false);
}

using var serviceClient = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString, options);
if (files.All(x => x.Value))
{
VerboseTestLogger.WriteLine($"Notifications have been received for all files uploads!");
allFilesFound.TrySetResult(true);
}

var counter = new FileUploadNotificationCounter();
Func<FileUploadNotification, AcknowledgementType> OnFileUploadNotificationReceived = (fileUploadNotification) =>
{
counter.FileUploadNotificationsReceived++;
return _acknowledgementType;
};
return AcknowledgementType.Complete;
}

serviceClient.FileUploadNotifications.FileUploadNotificationProcessor = OnFileUploadNotificationReceived;
serviceClient.FileUploadNotifications.FileUploadNotificationProcessor = OnFileUploadNotificationReceived;
VerboseTestLogger.WriteLine($"Opening client...");
await serviceClient.FileUploadNotifications.OpenAsync(cts.Token).ConfigureAwait(false);
if (shouldReconnect)
{
VerboseTestLogger.WriteLine($"Closing client...");
await serviceClient.FileUploadNotifications.CloseAsync(cts.Token).ConfigureAwait(false);
VerboseTestLogger.WriteLine($"Reopening client...");
await serviceClient.FileUploadNotifications.OpenAsync(cts.Token).ConfigureAwait(false);
}

// Close and re-open the client
await serviceClient.FileUploadNotifications.OpenAsync().ConfigureAwait(false);
await serviceClient.FileUploadNotifications.CloseAsync().ConfigureAwait(false);
await serviceClient.FileUploadNotifications.OpenAsync().ConfigureAwait(false);
for (int i = 0; i < filesToUpload; ++i)
{
string fileName = $"TestPayload-{Guid.NewGuid()}.txt";
files.Add(fileName, false);
await UploadFile(fileName, cts.Token).ConfigureAwait(false);
}

// Client should still be able to receive file upload notifications after being closed and re-opened.
await UploadFile().ConfigureAwait(false);
await WaitForFileUploadNotification(counter, 1);
await serviceClient.FileUploadNotifications.CloseAsync().ConfigureAwait(false);
await Task
.WhenAny(
allFilesFound.Task,
Task.Delay(-1, cts.Token))
.ConfigureAwait(false);
allFilesFound.Task.IsCompleted.Should().BeTrue();
}
finally
{
await serviceClient.FileUploadNotifications.CloseAsync().ConfigureAwait(false);
}
}

[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
[DataRow(IotHubTransportProtocol.Tcp)]
[DataRow(IotHubTransportProtocol.WebSocket)]
public async Task FileUploadNotification_ReceiveMultipleNotificationsInOneConnection(IotHubTransportProtocol protocol)
public async Task FileUploadNotification_ErrorProcessor_ReceivesNotifications(IotHubTransportProtocol protocol)
{
var options = new IotHubServiceClientOptions
{
Expand All @@ -103,98 +126,67 @@ public async Task FileUploadNotification_ReceiveMultipleNotificationsInOneConnec

using var serviceClient = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString, options);

var counter = new FileUploadNotificationCounter();
Func<FileUploadNotification, AcknowledgementType> OnFileUploadNotificationReceived = (fileUploadNotification) =>
{
counter.FileUploadNotificationsReceived++;
return _acknowledgementType;
};

serviceClient.FileUploadNotifications.FileUploadNotificationProcessor = OnFileUploadNotificationReceived;

await serviceClient.FileUploadNotifications.OpenAsync().ConfigureAwait(false);
await UploadFile().ConfigureAwait(false);
await UploadFile().ConfigureAwait(false);

// The open file upload notification processor should be able to receive more than one
// file upload notification without closing and re-opening as long as there is more
// than one file upload notification to consume.
await WaitForFileUploadNotification(counter, 2);
await serviceClient.FileUploadNotifications.CloseAsync().ConfigureAwait(false);
}

/// <summary>
/// Wait until the expected number of file upload notifications have been received. If the expected
/// number of notifications are not received in time, this method throws a AssertionFailedException.
/// </summary>
/// <param name="fileUploadNotificationReceivedCount">The current number of file upload notifications received.</param>
/// <param name="expectedFileUploadNotificationReceivedCount">The expected number of file upload notifications to receive in this test.</param>
private static async Task WaitForFileUploadNotification(FileUploadNotificationCounter counter, int expectedFileUploadNotificationReceivedCount)
{
var timer = Stopwatch.StartNew();
try
{
// Note that this test may receive notifications from other file upload tests, so the received count may be higher
// than the expected count.
while (counter.FileUploadNotificationsReceived < expectedFileUploadNotificationReceivedCount)
var errorProcessorNotified = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
serviceClient.FileUploadNotifications.FileUploadNotificationProcessor = (_) => Task.FromResult(_defaultAcknowledgementType);
serviceClient.FileUploadNotifications.ErrorProcessor = (errorContext) =>
{
if (timer.ElapsedMilliseconds > 200000)
{
throw new AssertFailedException(
$"Timed out waiting for the expected number of file upload notifications. Received {counter.FileUploadNotificationsReceived}, expected {expectedFileUploadNotificationReceivedCount}");
}

await Task.Delay(800);
}
VerboseTestLogger.WriteLine("Error processor fired.");
errorProcessorNotified.TrySetResult(true);
return Task.CompletedTask;
};

VerboseTestLogger.WriteLine("Opening client...");
await serviceClient.FileUploadNotifications.OpenAsync().ConfigureAwait(false);
VerboseTestLogger.WriteLine("Client opened.");

VerboseTestLogger.WriteLine("Client closing...");
await serviceClient.FileUploadNotifications.CloseAsync().ConfigureAwait(false);
VerboseTestLogger.WriteLine("Client closed.");

// The open file upload notification processor should be able to receive more than one
// file upload notification without closing and re-opening as long as there is more
// than one file upload notification to consume.
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(TestTimeoutMilliseconds));
await Task
.WhenAny(
errorProcessorNotified.Task,
Task.Delay(-1, cts.Token))
.ConfigureAwait(false);
errorProcessorNotified.Task.IsCompleted.Should().BeTrue();
}
finally
{
timer.Stop();
serviceClient.FileUploadNotifications.ErrorProcessor = null;
await serviceClient.FileUploadNotifications.CloseAsync().ConfigureAwait(false);
}
}

private async Task UploadFile()
private async Task UploadFile(string fileName, CancellationToken ct)
{
await using TestDevice testDevice = await TestDevice.GetTestDeviceAsync(_devicePrefix).ConfigureAwait(false);
IotHubDeviceClient deviceClient = testDevice.CreateDeviceClient(new IotHubClientOptions(new IotHubClientAmqpSettings()));
await testDevice.OpenWithRetryAsync().ConfigureAwait(false);
const string filePath = "TestPayload.txt";
using FileStream fileStreamSource = File.Create(filePath);
using var sr = new StreamWriter(fileStreamSource);

sr.WriteLine("TestPayload");
string fileName = Path.GetFileName(fileStreamSource.Name);

Console.WriteLine($"Uploading file {fileName}");
await testDevice.OpenWithRetryAsync(ct).ConfigureAwait(false);

var fileUploadTime = Stopwatch.StartNew();
using var ms = new MemoryStream(Encoding.UTF8.GetBytes("TestPayload"));

VerboseTestLogger.WriteLine($"Uploading file {fileName}.");
var fileUploadSasUriRequest = new FileUploadSasUriRequest(fileName);
FileUploadSasUriResponse sasUri = await deviceClient.GetFileUploadSasUriAsync(fileUploadSasUriRequest).ConfigureAwait(false);
FileUploadSasUriResponse sasUri = await deviceClient.GetFileUploadSasUriAsync(fileUploadSasUriRequest, ct).ConfigureAwait(false);
Uri uploadUri = sasUri.GetBlobUri();

var blob = new CloudBlockBlob(uploadUri);
await blob.UploadFromStreamAsync(fileStreamSource).ConfigureAwait(false);
await blob.UploadFromStreamAsync(ms, ct).ConfigureAwait(false);

var successfulFileUploadCompletionNotification = new FileUploadCompletionNotification(sasUri.CorrelationId, true)
{
StatusCode = 200,
StatusDescription = "Success"
};

await deviceClient.CompleteFileUploadAsync(successfulFileUploadCompletionNotification).ConfigureAwait(false);
}

// This class exists to facilitate passing around an integer by reference. It is incremented
// in a callback function and has its value checked in the WaitForFileUploadNotification function.
private class FileUploadNotificationCounter
{
public FileUploadNotificationCounter()
{
FileUploadNotificationsReceived = 0;
}

public int FileUploadNotificationsReceived;
VerboseTestLogger.WriteLine($"Completing upload for {fileName}");
await deviceClient.CompleteFileUploadAsync(successfulFileUploadCompletionNotification, ct).ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private async Task ReceiveFileUploadNotificationsAsync(string targetDeviceId, Ca

_logger.LogInformation($"Listening for file upload notifications from the service.");

AcknowledgementType FileUploadNotificationCallback(FileUploadNotification fileUploadNotification)
Task<AcknowledgementType> FileUploadNotificationCallback(FileUploadNotification fileUploadNotification)
{
AcknowledgementType ackType = AcknowledgementType.Abandon;

Expand Down Expand Up @@ -100,7 +100,7 @@ AcknowledgementType FileUploadNotificationCallback(FileUploadNotification fileUp
_totalNotificationsCompleted++;
}

return ackType;
return Task.FromResult(ackType);
}

s_serviceClient.FileUploadNotifications.FileUploadNotificationProcessor = FileUploadNotificationCallback;
Expand Down
Loading

0 comments on commit 2816428

Please sign in to comment.