Skip to content

Commit

Permalink
Add ErrorProcessor test
Browse files Browse the repository at this point in the history
  • Loading branch information
David R. Williamson committed Mar 31, 2023
1 parent f8ea9e2 commit 2de34a1
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 22 deletions.
13 changes: 8 additions & 5 deletions e2e/Tests/helpers/StorageContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,26 @@ namespace Microsoft.Azure.Devices.E2ETests.Helpers
public class StorageContainer : IDisposable
{
private bool _disposed;
private readonly bool _deleteOnDispose;

public string ContainerName { get; }
public Uri Uri { get; private set; }
public Uri SasUri { get; private set; }
public CloudBlobContainer CloudBlobContainer { get; private set; }

private StorageContainer(string containerName)
private StorageContainer(string containerName, bool deleteOnDispose)
{
if (string.IsNullOrWhiteSpace(containerName))
{
containerName = Guid.NewGuid().ToString();
}

_deleteOnDispose = deleteOnDispose;
ContainerName = containerName;
}

public static async Task<StorageContainer> GetInstanceAsync(string containerName = null)
public static async Task<StorageContainer> GetInstanceAsync(string containerName = null, bool deleteOnDispose = true)
{
var sc = new StorageContainer(containerName);
var sc = new StorageContainer(containerName, deleteOnDispose);
await sc.InitializeAsync().ConfigureAwait(false);
return sc;
}
Expand Down Expand Up @@ -125,7 +126,9 @@ protected virtual void Dispose(bool disposing)
return;
}

if (disposing && CloudBlobContainer != null)
if (disposing
&& CloudBlobContainer != null
&& _deleteOnDispose)
{
CloudBlobContainer.Delete();
CloudBlobContainer = null;
Expand Down
95 changes: 78 additions & 17 deletions e2e/Tests/iothub/service/FileUploadNotificationE2ETest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,86 +46,147 @@ public async Task FileUploadNotification_FileUploadNotificationProcessor_Receive
Protocol = protocol
};

using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(TestTimeoutMilliseconds));
using var serviceClient = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString, options);
using StorageContainer storage = await StorageContainer.GetInstanceAsync("fileupload", false).ConfigureAwait(false);
using var fileNotification = new SemaphoreSlim(1, 1);

try
{
var files = new Dictionary<string, bool>(filesToUpload);
var allFilesFound = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Task<AcknowledgementType> OnFileUploadNotificationReceived(FileUploadNotification fileUploadNotification)
async Task<AcknowledgementType> OnFileUploadNotificationReceived(FileUploadNotification fileUploadNotification)
{
string fileName = fileUploadNotification.BlobName.Substring(fileUploadNotification.BlobName.IndexOf('/') + 1);
if (!files.ContainsKey(fileName))
{
// Notification does not belong to this test
return Task.FromResult(_defaultAcknowledgementType);
VerboseTestLogger.WriteLine($"Received notification for unrelated file {fileName}.");
return _defaultAcknowledgementType;
}

VerboseTestLogger.WriteLine($"Received notification for {fileName}.");
if (!files[fileName])
{
files[fileName] = true;
CloudBlob blob = storage.CloudBlobContainer.GetBlobReference(fileUploadNotification.BlobName);
VerboseTestLogger.WriteLine($"Deleting blob {fileUploadNotification.BlobName}...");
await blob.DeleteIfExistsAsync(cts.Token).ConfigureAwait(false);
}

files[fileName] = true;
if (files.All(x => x.Value))
{
VerboseTestLogger.WriteLine($"Notifications have been received for all files uploads!");
allFilesFound.TrySetResult(true);
}

return Task.FromResult(AcknowledgementType.Complete);
return AcknowledgementType.Complete;
}

serviceClient.FileUploadNotifications.FileUploadNotificationProcessor = OnFileUploadNotificationReceived;
await serviceClient.FileUploadNotifications.OpenAsync().ConfigureAwait(false);
VerboseTestLogger.WriteLine($"Opening client...");
await serviceClient.FileUploadNotifications.OpenAsync(cts.Token).ConfigureAwait(false);
if (shouldReconnect)
{
await serviceClient.FileUploadNotifications.CloseAsync().ConfigureAwait(false);
await serviceClient.FileUploadNotifications.OpenAsync().ConfigureAwait(false);
VerboseTestLogger.WriteLine($"Closing client...");
await serviceClient.FileUploadNotifications.CloseAsync(cts.Token).ConfigureAwait(false);
VerboseTestLogger.WriteLine($"Reopening client...");
await serviceClient.FileUploadNotifications.OpenAsync(cts.Token).ConfigureAwait(false);
}

for (int i = 0; i < filesToUpload; ++i)
{
string fileName = $"TestPayload-{Guid.NewGuid()}.txt";
files.Add(fileName, false);
await UploadFile(fileName).ConfigureAwait(false);
await UploadFile(fileName, cts.Token).ConfigureAwait(false);
}

await Task
.WhenAny(
allFilesFound.Task,
Task.Delay(-1, cts.Token))
.ConfigureAwait(false);
allFilesFound.Task.IsCompleted.Should().BeTrue();
}
finally
{
await serviceClient.FileUploadNotifications.CloseAsync().ConfigureAwait(false);
}
}

[TestMethod]
[DataRow(IotHubTransportProtocol.Tcp)]
[DataRow(IotHubTransportProtocol.WebSocket)]
public async Task FileUploadNotification_ErrorProcessor_ReceivesNotifications(IotHubTransportProtocol protocol)
{
var options = new IotHubServiceClientOptions
{
Protocol = protocol
};

using var serviceClient = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString, options);

try
{
var errorProcessorNotified = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
serviceClient.FileUploadNotifications.FileUploadNotificationProcessor = (_) => Task.FromResult(_defaultAcknowledgementType);
serviceClient.FileUploadNotifications.ErrorProcessor = (errorContext) =>
{
VerboseTestLogger.WriteLine("Error processor fired.");
errorProcessorNotified.TrySetResult(true);
return Task.CompletedTask;
};

VerboseTestLogger.WriteLine("Opening client...");
await serviceClient.FileUploadNotifications.OpenAsync().ConfigureAwait(false);
VerboseTestLogger.WriteLine("Client opened.");

VerboseTestLogger.WriteLine("Client closing...");
await serviceClient.FileUploadNotifications.CloseAsync().ConfigureAwait(false);
VerboseTestLogger.WriteLine("Client closed.");

// The open file upload notification processor should be able to receive more than one
// file upload notification without closing and re-opening as long as there is more
// than one file upload notification to consume.
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(TestTimeoutMilliseconds));
await Task
.WhenAny(
allFilesFound.Task,
errorProcessorNotified.Task,
Task.Delay(-1, cts.Token))
.ConfigureAwait(false);
allFilesFound.Task.IsCompleted.Should().BeTrue();
errorProcessorNotified.Task.IsCompleted.Should().BeTrue();
}
finally
{
serviceClient.FileUploadNotifications.ErrorProcessor = null;
await serviceClient.FileUploadNotifications.CloseAsync().ConfigureAwait(false);
}
}

private async Task UploadFile(string fileName)
private async Task UploadFile(string fileName, CancellationToken ct)
{
await using TestDevice testDevice = await TestDevice.GetTestDeviceAsync(_devicePrefix).ConfigureAwait(false);
IotHubDeviceClient deviceClient = testDevice.CreateDeviceClient(new IotHubClientOptions(new IotHubClientAmqpSettings()));
await testDevice.OpenWithRetryAsync().ConfigureAwait(false);
await testDevice.OpenWithRetryAsync(ct).ConfigureAwait(false);

using var ms = new MemoryStream(Encoding.UTF8.GetBytes("TestPayload"));

Console.WriteLine($"Uploading file {fileName}");

VerboseTestLogger.WriteLine($"Uploading file {fileName}.");
var fileUploadSasUriRequest = new FileUploadSasUriRequest(fileName);
FileUploadSasUriResponse sasUri = await deviceClient.GetFileUploadSasUriAsync(fileUploadSasUriRequest).ConfigureAwait(false);
FileUploadSasUriResponse sasUri = await deviceClient.GetFileUploadSasUriAsync(fileUploadSasUriRequest, ct).ConfigureAwait(false);
Uri uploadUri = sasUri.GetBlobUri();

var blob = new CloudBlockBlob(uploadUri);
await blob.UploadFromStreamAsync(ms).ConfigureAwait(false);
await blob.UploadFromStreamAsync(ms, ct).ConfigureAwait(false);

var successfulFileUploadCompletionNotification = new FileUploadCompletionNotification(sasUri.CorrelationId, true)
{
StatusCode = 200,
StatusDescription = "Success"
};

await deviceClient.CompleteFileUploadAsync(successfulFileUploadCompletionNotification).ConfigureAwait(false);
VerboseTestLogger.WriteLine($"Completing upload for {fileName}");
await deviceClient.CompleteFileUploadAsync(successfulFileUploadCompletionNotification, ct).ConfigureAwait(false);
}
}
}

0 comments on commit 2de34a1

Please sign in to comment.