Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove telemetry batch operation over MQTT #3000

Merged
merged 7 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion SDK v2 migration guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ Find a client you currently use below, read the table of API name changes and us
- A new property has been added with a recommended action, which a device developer may observe or ignore.
- The file upload method has been split into the three individual steps that this method used to take. See [this file upload sample](./iothub/device/samples/getting%20started/FileUploadSample/) for an example of how to do file upload using these discrete steps.
- Cloud-to-device messages can be received by calling `SetMessageIncomingMessageCallbackAsync` and providing a callback. Users no longer need to poll for messages with `ReceiveAsync`.
- Support for sending a batch of events over MQTT by calling `SendEventBatchAsync` has been removed. MQTT v3.1 does not support true batching but instead sends the messages one after another. True batching is still supported over AMQP.
- Several callback handler set methods and definitions have changed, losing the `userContext` parameter which was deemed unnecessary and a vestige of the C device client.
- The exponential back-off retry policy has updated parameters and logic.
- Remote certificate validation is no natively longer supported for AMQP web socket connections.
Expand Down Expand Up @@ -161,7 +162,7 @@ Find a client you currently use below, read the table of API name changes and us
| `DeviceClient` | `IotHubDeviceClient` | Specify the service it is a device client for. |
| `DeviceClient.Dispose()` | `IotHubDeviceClient.DisposeAsync()` | Ensures the client is closed before disposing. |
| `DeviceClient.SendEventAsync(...)` | `IotHubDeviceClient.SendTelemetryAsync(...)` | Even our public documentation calls this telemetry, so we renamed the method to describe this better.¹ |
| `DeviceClient.SendEventBatchAsync(...)` | `IotHubDeviceClient.SendTelemetryBatchAsync(...)` | See¹ |
| `DeviceClient.SendEventBatchAsync(...)` | `IotHubDeviceClient.SendTelemetryBatchAsync(...)` | This is now only supported over AMQP. Support over MQTT has been removed. Also, see¹. |
| `DeviceClient.SetConnectionStatusChangesHandler(...)` | `IotHubDeviceClient.ConnectionStatusChangeCallback` | Local operation doesn't require being a method. |
| `DeviceClient.SetReceiveMessageHandlerAsync(...)` | `IotHubDeviceClient.SetIncomingMessageCallbackAsync(...)` | Disambiguate from telemetry messages. |
| `DeviceClient.GetTwinAsync(...)` | `IotHubDeviceClient.GetTwinPropertiesAsync(...)` | The device client doesn't get the full twin, just the properties so this helps avoid that confusion.² |
Expand Down
123 changes: 57 additions & 66 deletions e2e/test/iothub/device/TelemetryE2eTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.E2ETests.Helpers;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand All @@ -22,9 +23,6 @@ public partial class TelemetryE2ETests : E2EMsTestBase
// The maximum message size for device to cloud messages is 256 KB. We are allowing 1 KB of buffer for message header information etc.
private const int LargeMessageSizeInBytes = 255 * 1024;

// The size of a device to cloud message. This exceeds the the maximum message size set by the hub; 256 KB.
private const int ExceedAllowedMessageSizeInBytes = 300 * 1024;

// The size of a device to cloud message. This overly exceeds the maximum message size set by the hub, which is 256 KB.
// The reason why we are testing for this case is because we noticed a different behavior between this case, and the case where
// the message size is less than 1 MB.
Expand All @@ -36,33 +34,42 @@ public partial class TelemetryE2ETests : E2EMsTestBase

[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
[DataRow(IotHubClientTransportProtocol.Tcp)]
[DataRow(IotHubClientTransportProtocol.WebSocket)]
public async Task Message_DeviceSendSingleMessage_Amqp(IotHubClientTransportProtocol protocol)
[DataRow(TestDeviceType.Sasl, IotHubClientTransportProtocol.Tcp)]
[DataRow(TestDeviceType.Sasl, IotHubClientTransportProtocol.WebSocket)]
[DataRow(TestDeviceType.X509, IotHubClientTransportProtocol.Tcp)]
[DataRow(TestDeviceType.X509, IotHubClientTransportProtocol.WebSocket)]
public async Task Message_DeviceSendSingleMessage_Mqtt(TestDeviceType testDeviceType, IotHubClientTransportProtocol protocol)
{
await SendSingleMessage(TestDeviceType.Sasl, new IotHubClientAmqpSettings(protocol)).ConfigureAwait(false);
await SendSingleMessage(testDeviceType, new IotHubClientMqttSettings(protocol)).ConfigureAwait(false);
}

[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
[DataRow(IotHubClientTransportProtocol.Tcp)]
[DataRow(IotHubClientTransportProtocol.WebSocket)]
public async Task Message_DeviceSendSingleMessage_Mqtt(IotHubClientTransportProtocol protocol)
[Timeout(LongRunningTestTimeoutMilliseconds)]
[TestCategory("Proxy")]
[TestCategory("LongRunning")]
public async Task Message_DeviceSendSingleMessage_MqttWs_WithProxy()
{
await SendSingleMessage(TestDeviceType.Sasl, new IotHubClientMqttSettings(protocol)).ConfigureAwait(false);
var mqttTransportSettings = new IotHubClientMqttSettings(IotHubClientTransportProtocol.WebSocket)
{
Proxy = new WebProxy(s_proxyServerAddress),
};

await SendSingleMessage(TestDeviceType.X509, mqttTransportSettings).ConfigureAwait(false);
}

[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
[DataRow(IotHubClientTransportProtocol.Tcp)]
[DataRow(IotHubClientTransportProtocol.WebSocket)]
public async Task Message_DeviceSendSingleMessage_Amqp_WithHeartbeats(IotHubClientTransportProtocol protocol)
[DataRow(TestDeviceType.Sasl, IotHubClientTransportProtocol.Tcp)]
[DataRow(TestDeviceType.Sasl, IotHubClientTransportProtocol.WebSocket)]
[DataRow(TestDeviceType.X509, IotHubClientTransportProtocol.Tcp)]
[DataRow(TestDeviceType.X509, IotHubClientTransportProtocol.WebSocket)]
public async Task Message_DeviceSendSingleMessage_Amqp_WithHeartbeats(TestDeviceType testDeviceType, IotHubClientTransportProtocol protocol)
{
var amqpTransportSettings = new IotHubClientAmqpSettings(protocol)
{
IdleTimeout = TimeSpan.FromMinutes(2),
};
await SendSingleMessage(TestDeviceType.Sasl, amqpTransportSettings).ConfigureAwait(false);
await SendSingleMessage(testDeviceType, amqpTransportSettings).ConfigureAwait(false);
}

[TestMethod]
Expand All @@ -76,68 +83,57 @@ public async Task Message_DeviceSendSingleMessage_AmqpWs_WithProxy()
Proxy = new WebProxy(s_proxyServerAddress),
};

await SendSingleMessage(TestDeviceType.Sasl, amqpTransportSettings).ConfigureAwait(false);
}

[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
[DataRow(IotHubClientTransportProtocol.Tcp)]
[DataRow(IotHubClientTransportProtocol.WebSocket)]
public async Task X509_DeviceSendSingleMessage_Amqp(IotHubClientTransportProtocol protocol)
{
await SendSingleMessage(TestDeviceType.X509, new IotHubClientAmqpSettings(protocol)).ConfigureAwait(false);
}

[TestMethod]
[Timeout(LongRunningTestTimeoutMilliseconds)]
[TestCategory("LongRunning")]
[DataRow(IotHubClientTransportProtocol.Tcp)]
[DataRow(IotHubClientTransportProtocol.WebSocket)]
public async Task X509_DeviceSendSingleMessage_Mqtt(IotHubClientTransportProtocol protocol)
{
await SendSingleMessage(TestDeviceType.X509, new IotHubClientMqttSettings(protocol)).ConfigureAwait(false);
await SendSingleMessage(TestDeviceType.X509, amqpTransportSettings).ConfigureAwait(false);
}

[TestMethod]
[Timeout(LongRunningTestTimeoutMilliseconds)]
[TestCategory("LongRunning")]
[DataRow(IotHubClientTransportProtocol.Tcp)]
[DataRow(IotHubClientTransportProtocol.WebSocket)]
public async Task X509_DeviceSendBatchMessages_Amqp(IotHubClientTransportProtocol protocol)
[DataRow(TestDeviceType.Sasl, IotHubClientTransportProtocol.Tcp)]
[DataRow(TestDeviceType.Sasl, IotHubClientTransportProtocol.WebSocket)]
[DataRow(TestDeviceType.X509, IotHubClientTransportProtocol.Tcp)]
[DataRow(TestDeviceType.X509, IotHubClientTransportProtocol.WebSocket)]
public async Task Message_DeviceSendBatchMessages_Amqp(TestDeviceType testDeviceType, IotHubClientTransportProtocol protocol)
{
await SendBatchMessages(TestDeviceType.X509, new IotHubClientAmqpSettings(protocol)).ConfigureAwait(false);
await SendBatchMessages(testDeviceType, new IotHubClientAmqpSettings(protocol)).ConfigureAwait(false);
}

[TestMethod]
[Timeout(LongRunningTestTimeoutMilliseconds)]
[TestCategory("LongRunning")]
[DataRow(IotHubClientTransportProtocol.Tcp)]
[DataRow(IotHubClientTransportProtocol.WebSocket)]
public async Task X509_DeviceSendBatchMessages_Mqtt(IotHubClientTransportProtocol protocol)
[DataTestMethod]
[DataRow(TestDeviceType.Sasl, IotHubClientTransportProtocol.Tcp)]
[DataRow(TestDeviceType.Sasl, IotHubClientTransportProtocol.WebSocket)]
[DataRow(TestDeviceType.X509, IotHubClientTransportProtocol.Tcp)]
[DataRow(TestDeviceType.X509, IotHubClientTransportProtocol.WebSocket)]
public async Task Message_DeviceSendSingleLargeMessageAsync_Amqp(TestDeviceType testDeviceType, IotHubClientTransportProtocol protocol)
{
await SendBatchMessages(TestDeviceType.X509, new IotHubClientMqttSettings(protocol)).ConfigureAwait(false);
var transportSettings = new IotHubClientAmqpSettings(protocol);
await SendSingleMessage(testDeviceType, transportSettings, LargeMessageSizeInBytes);
}

[DataTestMethod]
[DataRow(IotHubClientTransportProtocol.Tcp, TestDeviceType.Sasl)]
[DataRow(IotHubClientTransportProtocol.Tcp, TestDeviceType.X509)]
[DataRow(IotHubClientTransportProtocol.WebSocket, TestDeviceType.Sasl)]
[DataRow(IotHubClientTransportProtocol.WebSocket, TestDeviceType.X509)]
public async Task Message_DeviceSendSingleLargeMessageAsync_Amqp(IotHubClientTransportProtocol protocol, TestDeviceType testDeviceType)
[DataRow(TestDeviceType.Sasl, IotHubClientTransportProtocol.Tcp)]
[DataRow(TestDeviceType.Sasl, IotHubClientTransportProtocol.WebSocket)]
[DataRow(TestDeviceType.X509, IotHubClientTransportProtocol.Tcp)]
[DataRow(TestDeviceType.X509, IotHubClientTransportProtocol.WebSocket)]
public async Task Message_DeviceSendSingleLargeMessageAsync_Mqtt(TestDeviceType testDeviceType, IotHubClientTransportProtocol protocol)
{
var transportSettings = new IotHubClientAmqpSettings(protocol);
await Message_DeviceSendSingleLargeMessageAsync(testDeviceType, transportSettings);
var transportSettings = new IotHubClientMqttSettings(protocol);
await SendSingleMessage(testDeviceType, transportSettings, LargeMessageSizeInBytes);
}

// We cannot test this over MQTT since MQTT will disconnect the client if it receives an invalid payload.
[DataTestMethod]
[DataRow(IotHubClientTransportProtocol.Tcp, TestDeviceType.Sasl)]
[DataRow(IotHubClientTransportProtocol.Tcp, TestDeviceType.X509)]
[DataRow(IotHubClientTransportProtocol.WebSocket, TestDeviceType.Sasl)]
[DataRow(IotHubClientTransportProtocol.WebSocket, TestDeviceType.X509)]
public async Task Message_DeviceSendSingleLargeMessageAsync_Mqtt(IotHubClientTransportProtocol protocol, TestDeviceType testDeviceType)
[DataRow(TestDeviceType.Sasl, IotHubClientTransportProtocol.Tcp)]
[DataRow(TestDeviceType.Sasl, IotHubClientTransportProtocol.WebSocket)]
[DataRow(TestDeviceType.X509, IotHubClientTransportProtocol.Tcp)]
[DataRow(TestDeviceType.X509, IotHubClientTransportProtocol.WebSocket)]
public async Task Message_DeviceSendSingleOverlyLargeMessageAsync_Amqp(TestDeviceType testDeviceType, IotHubClientTransportProtocol protocol)
{
var transportSettings = new IotHubClientMqttSettings(protocol);
await Message_DeviceSendSingleLargeMessageAsync(testDeviceType, transportSettings);
var transportSettings = new IotHubClientAmqpSettings(protocol);
Func<Task> actionAsync = async () => await SendSingleMessage(testDeviceType, transportSettings, OverlyExceedAllowedMessageSizeInBytes);
await actionAsync
.Should()
.ThrowAsync<IotHubClientException>()
.Where(ex => ex.ErrorCode == IotHubClientErrorCode.MessageTooLarge);
}

[TestMethod]
Expand All @@ -158,11 +154,6 @@ public async Task Message_DeviceOpenCloseOpenSendSingleMessage_Mqtt(IotHubClient
await OpenCloseOpenThenSendSingleMessage(TestDeviceType.Sasl, new IotHubClientMqttSettings(protocol)).ConfigureAwait(false);
}

private async Task Message_DeviceSendSingleLargeMessageAsync(TestDeviceType testDeviceType, IotHubClientTransportSettings transportSettings)
{
await SendSingleMessage(testDeviceType, transportSettings, LargeMessageSizeInBytes).ConfigureAwait(false);
}

private async Task SendSingleMessage(TestDeviceType type, IotHubClientTransportSettings transportSettings, int messageSize = 0)
{
using TestDevice testDevice = await TestDevice.GetTestDeviceAsync(_devicePrefix, type).ConfigureAwait(false);
Expand Down
6 changes: 5 additions & 1 deletion iothub/device/src/IotHubBaseClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ public async Task SendTelemetryAsync(TelemetryMessage message, CancellationToken
/// <remarks>
/// The client instance must be opened already.
/// <para>
/// Use AMQP for a true batch operation. MQTT will just send the messages one after the other.
/// This operation is supported only over AMQP.
/// </para>
/// <para>
/// This operation is not supported over MQTT and will result in an <see cref="InvalidOperationException"/>.
/// </para>
/// <para>
/// For more information on IoT Edge module routing for <see cref="IotHubModuleClient"/> see
Expand All @@ -178,6 +181,7 @@ public async Task SendTelemetryAsync(TelemetryMessage message, CancellationToken
/// <param name="messages">An <see cref="IEnumerable{Message}"/> set of message objects.</param>
/// <param name="cancellationToken">A cancellation token to cancel the operation.</param>
/// <exception cref="InvalidOperationException">Thrown if the client instance is not opened already.</exception>
/// <exception cref="InvalidOperationException">When this method is called when the client is configured to use MQTT.</exception>
/// <exception cref="OperationCanceledException">Thrown when the operation has been canceled.</exception>
public async Task SendTelemetryBatchAsync(IEnumerable<TelemetryMessage> messages, CancellationToken cancellationToken = default)
{
Expand Down
9 changes: 2 additions & 7 deletions iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -419,14 +419,9 @@ public override async Task SendTelemetryAsync(TelemetryMessage message, Cancella
}
}

public override async Task SendTelemetryBatchAsync(IEnumerable<TelemetryMessage> messages, CancellationToken cancellationToken)
public override Task SendTelemetryBatchAsync(IEnumerable<TelemetryMessage> messages, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

// Note that this sends all messages at once and then waits for all the acknowledgements. This
// is the recommended pattern for sending large numbers of messages over an asynchronous
// protocol like MQTT
await Task.WhenAll(messages.Select(x => SendTelemetryAsync(x, cancellationToken))).ConfigureAwait(false);
throw new InvalidOperationException("This operation is not supported over MQTT. Please refer to the API comments for additional details.");
}

public override async Task EnableMethodsAsync(CancellationToken cancellationToken)
Expand Down
2 changes: 1 addition & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ Note that you can configure your TLS protocol version and ciphers by following [
| Features | mqtt | mqtt-ws | amqp | amqp-ws | Description |
|------------------------------------------------------------------------------------------------------------------|---------------------|---------------------|---------------------|---------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| [Authentication](https://docs.microsoft.com/azure/iot-hub/iot-hub-security-deployment) | :heavy_check_mark: | :heavy_check_mark:* | :heavy_check_mark: | :heavy_check_mark:* | Connect your device to IoT Hub securely with supported authentication, including private key, SASToken, X-509 Self Signed and X-509 CA Signed. </br> *IoT Hub only supports X-509 CA Signed over AMQP over TCP and MQTT over TCP at the moment. X509-CA authentication over websocket is not supported. |
| [Send device-to-cloud message](https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-messages-d2c) | :heavy_check_mark:* | :heavy_check_mark:* | :heavy_check_mark: | :heavy_check_mark: | Send device-to-cloud messages (max 256KB) to IoT Hub with the option to add application properties and system properties, and batch send. </br> *IoT Hub only supports batch send over AMQP at the moment. The MQTT implementation loops over the batch and sends each message individually. |
| [Send device-to-cloud message](https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-messages-d2c) | :heavy_check_mark:* | :heavy_check_mark:* | :heavy_check_mark: | :heavy_check_mark: | Send device-to-cloud messages (max 256KB) to IoT Hub with the option to add application properties and system properties, and batch send. </br> *IoT Hub only supports batch send over AMQP at the moment. |
| [Receive cloud-to-device messages](https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-messages-c2d) | :heavy_check_mark:* | :heavy_check_mark:* | :heavy_check_mark: | :heavy_check_mark: | Receive cloud-to-device messages and read associated application and system properties from IoT Hub, with the option to complete/reject/abandon C2D messages. </br> *IoT Hub does not support the option to reject/abandon C2D messages over MQTT at the moment. |
| [Device Twins](https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-device-twins) | :heavy_check_mark:* | :heavy_check_mark:* | :heavy_check_mark:* | :heavy_check_mark:* | IoT Hub persists a device twin for each device that you connect to IoT Hub. The device can perform operations like get twin tags, subscribe to desired properties. </br> *Send reported properties version and desired properties version are in progress. |
| [Direct Methods](https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-direct-methods) | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | IoT Hub gives you the ability to invoke direct methods on devices from the cloud. The SDK supports handler for method specific and generic operation. |
Expand Down