Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix amqp objects closure event handlers #3127

Merged
merged 6 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions e2e/test/helpers/TestDeviceCallbackHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public Message ExpectedMessageSentByService
set => Volatile.Write(ref _expectedMessageSentByService, value);
}

public async Task SetDeviceReceiveMethodAsync<T>(string methodName, object deviceResponseJson, T expectedServiceRequestJson)
public async Task SetDeviceReceiveMethodAsync<T>(string methodName, object deviceResponse, T expectedServiceRequestJson)
{
await _deviceClient.OpenAsync().ConfigureAwait(false);
await _deviceClient.SetDirectMethodCallbackAsync(
Expand All @@ -58,9 +58,9 @@ await _deviceClient.SetDirectMethodCallbackAsync(
request.TryGetPayload(out T actualRequestPayload).Should().BeTrue();
actualRequestPayload.Should().BeEquivalentTo(expectedServiceRequestJson, "The expected method data should match what was sent from service");

var response = new Client.DirectMethodResponse(200)
var response = new DirectMethodResponse(200)
{
Payload = deviceResponseJson,
Payload = deviceResponse,
};
return Task.FromResult(response);
}
Expand Down
3 changes: 1 addition & 2 deletions e2e/test/helpers/templates/FaultInjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public static async Task TestErrorInjectionAsync(
{
using TestDevice testDevice = await TestDevice.GetTestDeviceAsync(devicePrefix, type).ConfigureAwait(false);

IotHubDeviceClient deviceClient = testDevice.CreateDeviceClient(new IotHubClientOptions(transportSettings));
await using IotHubDeviceClient deviceClient = testDevice.CreateDeviceClient(new IotHubClientOptions(transportSettings));

int connectionStatusChangeCount = 0;

Expand Down Expand Up @@ -207,7 +207,6 @@ void OnConnectionStatusChanged(ConnectionStatusInfo connectionStatusInfo)
{
await cleanupOperation().ConfigureAwait(false);
}
await deviceClient.DisposeAsync();
await testDevice.RemoveDeviceAsync().ConfigureAwait(false);

if (!FaultShouldDisconnect(faultType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Diagnostics.CodeAnalysis;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
Expand Down
2 changes: 1 addition & 1 deletion e2e/test/iothub/device/MessageReceiveE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace Microsoft.Azure.Devices.E2ETests.Messaging
[TestCategory("E2E")]
[TestCategory("IoTHub")]
[TestCategory("LongRunning")]
public partial class MessageReceiveE2ETests : E2EMsTestBase
public class MessageReceiveE2ETests : E2EMsTestBase
{
private static readonly string s_devicePrefix = $"{nameof(MessageReceiveE2ETests)}_";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,10 @@ private async Task ReceiveMessageWithCallbackRecoveryAsync(
{
TestDeviceCallbackHandler testDeviceCallbackHandler = null;
using var serviceClient = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString);
await serviceClient.Messages.OpenAsync().ConfigureAwait(false);

async Task InitOperationAsync(IotHubDeviceClient deviceClient, TestDevice testDevice)
{
await serviceClient.Messages.OpenAsync().ConfigureAwait(false);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20));
await deviceClient.OpenAsync(cts.Token).ConfigureAwait(false);

Expand Down
44 changes: 0 additions & 44 deletions e2e/test/iothub/device/MessageSendFaultInjectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,50 +187,6 @@ await SendMessageRecoveryAsync(
.ConfigureAwait(false);
}

[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
[DoNotParallelize]
public async Task Message_ThrottledConnectionLongTimeNoRecovery_Amqp()
{
// act
Func<Task> act = async () =>
{
await SendMessageRecoveryAsync(
new IotHubClientAmqpSettings(),
FaultInjectionConstants.FaultType_Throttle,
FaultInjectionConstants.FaultCloseReason_Boom,
FaultInjection.ShortRetryDuration)
.ConfigureAwait(false);
};

// assert
var error = await act.Should().ThrowAsync<IotHubClientException>();
error.And.ErrorCode.Should().Be(IotHubClientErrorCode.Throttled);
error.And.IsTransient.Should().BeTrue();
}

[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
[DoNotParallelize]
public async Task Message_ThrottledConnectionLongTimeNoRecovery_AmqpWs()
{
// act
Func<Task> act = async () =>
{
await SendMessageRecoveryAsync(
new IotHubClientAmqpSettings(IotHubClientTransportProtocol.WebSocket),
FaultInjectionConstants.FaultType_Throttle,
FaultInjectionConstants.FaultCloseReason_Boom,
FaultInjection.ShortRetryDuration)
.ConfigureAwait(false);
};

// assert
var error = await act.Should().ThrowAsync<IotHubClientException>();
error.And.ErrorCode.Should().Be(IotHubClientErrorCode.Throttled);
error.And.IsTransient.Should().BeTrue();
}

[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
[DoNotParallelize]
Expand Down
2 changes: 1 addition & 1 deletion iothub/device/src/Pipeline/RetryDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ private async Task RefreshSasTokenLoopAsync(DateTime refreshesOn, CancellationTo

await Task.Delay(waitTime, cancellationToken).ConfigureAwait(false);
}

refreshesOn = await RefreshSasTokenAsync(cancellationToken).ConfigureAwait(false);

waitTime = refreshesOn - DateTime.UtcNow;
Expand Down
7 changes: 6 additions & 1 deletion iothub/device/src/Transport/AmqpIot/AmqpIotConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,19 @@ internal AmqpIotCbsLink GetCbsLink()
return _amqpIotCbsLink;
}

// This event handler is not invoked by the AMQP library in an async fashion.
// This also co-relates with the fact that AmqpConnection.SafeClose() is a sync method.
internal void AmqpConnectionClosed(object sender, EventArgs e)
{
if (Logging.IsEnabled)
Logging.Enter(this, nameof(AmqpConnectionClosed));

Closed?.Invoke(this, e);

// After the Closed event handler has been invoked, the AmqpConnection has now been effectively cleaned up.
// This is a good point for us to detach the Closed event handler from the AmqpConnection instance.
_amqpConnection.Closed -= AmqpConnectionClosed;

if (Logging.IsEnabled)
Logging.Exit(this, nameof(AmqpConnectionClosed));
}
Expand Down Expand Up @@ -104,7 +110,6 @@ internal async Task<IAmqpAuthenticationRefresher> CreateRefresherAsync(IConnecti

internal void SafeClose()
{
_amqpConnection.Closed -= AmqpConnectionClosed;
_amqpConnection.SafeClose();
}

Expand Down
11 changes: 8 additions & 3 deletions iothub/device/src/Transport/AmqpIot/AmqpIotReceivingLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,19 @@ public AmqpIotReceivingLink(ReceivingAmqpLink receivingAmqpLink, PayloadConventi
_payloadConvention = payloadConvention;
}

// This event handler is not invoked by the AMQP library in an async fashion.
// This also co-relates with the fact that ReceivingAmqpLink.SafeClose() is a sync method.
private void ReceivingAmqpLinkClosed(object sender, EventArgs e)
{
if (Logging.IsEnabled)
Logging.Enter(this, nameof(ReceivingAmqpLinkClosed));

Closed?.Invoke(this, e);

// After the Closed event handler has been invoked, the ReceivingAmqpLink has now been effectively cleaned up.
// This is a good point for us to detach the Closed event handler from the ReceivingAmqpLink instance.
_receivingAmqpLink.Closed -= ReceivingAmqpLinkClosed;

if (Logging.IsEnabled)
Logging.Exit(this, nameof(ReceivingAmqpLinkClosed));
}
Expand All @@ -58,7 +64,6 @@ internal bool IsClosing()

internal void SafeClose()
{
_receivingAmqpLink.Closed -= ReceivingAmqpLinkClosed;
_receivingAmqpLink.SafeClose();
}

Expand Down Expand Up @@ -166,9 +171,9 @@ private void OnMethodReceived(AmqpMessage amqpMessage)

try
{
DirectMethodRequest DirectMethodRequest = AmqpIotMessageConverter.ConstructMethodRequestFromAmqpMessage(amqpMessage, _payloadConvention);
DirectMethodRequest directMethodRequest = AmqpIotMessageConverter.ConstructMethodRequestFromAmqpMessage(amqpMessage, _payloadConvention);
DisposeDelivery(amqpMessage, true, AmqpConstants.AcceptedOutcome);
_onMethodReceived?.Invoke(DirectMethodRequest);
_onMethodReceived?.Invoke(directMethodRequest);
}
finally
{
Expand Down
7 changes: 6 additions & 1 deletion iothub/device/src/Transport/AmqpIot/AmqpIotSendingLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ public AmqpIotSendingLink(SendingAmqpLink sendingAmqpLink)
_sendingAmqpLink.Closed += SendingAmqpLinkClosed;
}

// This event handler is not invoked by the AMQP library in an async fashion.
// This also co-relates with the fact that SendingAmqpLink.SafeClose() is a sync method.
private void SendingAmqpLinkClosed(object sender, EventArgs e)
{
if (Logging.IsEnabled)
Logging.Enter(this, nameof(SendingAmqpLinkClosed));

Closed?.Invoke(this, e);

// After the Closed event handler has been invoked, the SendingAmqpLink has now been effectively cleaned up.
// This is a good point for us to detach the Closed event handler from the SendingAmqpLink instance.
_sendingAmqpLink.Closed -= SendingAmqpLinkClosed;

if (Logging.IsEnabled)
Logging.Exit(this, nameof(SendingAmqpLinkClosed));
}
Expand All @@ -48,7 +54,6 @@ internal void SafeClose()
if (Logging.IsEnabled)
Logging.Enter(this, nameof(SafeClose));

_sendingAmqpLink.Closed -= SendingAmqpLinkClosed;
_sendingAmqpLink.SafeClose();

if (Logging.IsEnabled)
Expand Down
7 changes: 6 additions & 1 deletion iothub/device/src/Transport/AmqpIot/AmqpIotSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,19 @@ public AmqpIotSession(AmqpSession amqpSession)
_amqpSession.Closed += AmqpSessionClosed;
}

// This event handler is not invoked by the AMQP library in an async fashion.
// This also co-relates with the fact that AmqpSession.SafeClose() is a sync method.
private void AmqpSessionClosed(object sender, EventArgs e)
{
if (Logging.IsEnabled)
Logging.Enter(this, nameof(AmqpSessionClosed));

Closed?.Invoke(this, e);

// After the Closed event handler has been invoked, the AmqpSession has now been effectively cleaned up.
// This is a good point for us to detach the Closed event handler from the AmqpSession instance.
_amqpSession.Closed -= AmqpSessionClosed;

if (Logging.IsEnabled)
Logging.Exit(this, nameof(AmqpSessionClosed));
}
Expand All @@ -41,7 +47,6 @@ internal Task CloseAsync(CancellationToken cancellationToken)

internal void SafeClose()
{
_amqpSession.Closed -= AmqpSessionClosed;
_amqpSession.SafeClose();
}

Expand Down
2 changes: 1 addition & 1 deletion iothub/service/src/Amqp/AmqpSendingLinkHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public async Task OpenAsync(AmqpSession session, CancellationToken cancellationT
// By using a unique guid in the link's name, it becomes possible to correlate logs where a user
// may have multiple instances of this type of link open. It also makes it easier to correlate
// the state of this link with the service side logs if need be.
_linkName = "CloudToDevieMessageSenderLink-" + Guid.NewGuid();
_linkName = "CloudToDeviceMessageSenderLink-" + Guid.NewGuid();

if (Logging.IsEnabled)
Logging.Enter(this, $"Opening sending link with address {_linkAddress} and link name {_linkName}", nameof(OpenAsync));
Expand Down