Skip to content

Commit

Permalink
Make DirectMethodPrequest.Payload to be settable by client applicatio…
Browse files Browse the repository at this point in the history
…ns (#3317)
  • Loading branch information
abhipsaMisra authored May 2, 2023
1 parent 6489070 commit d6a4c3c
Show file tree
Hide file tree
Showing 19 changed files with 85 additions and 52 deletions.
1 change: 1 addition & 0 deletions SDK v2 migration guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ The device client and module client share a lot of API surface and underlying im
| `SetInputMessageHandlerAsync` | `SetIncomingMessageCallbackAsync` | The input parameter is deprecated; the callback can observe and filter using the `IncomingMessage.InputName` property. |
| `ModuleClient.SendEventAsync(string outputName, ...)` | `IotHubModuleClient.SendMessageToRouteAsync(string outputName, ...)` | Change the name to be more descriptive about sending messages between Edge modules.¹ |
| `ModuleClient.SendEventBatchAsync(string outputName, ...)` | `IotHubModuleClient.SendMessagesToRouteAsync(string outputName, ...)` | See¹ |
| `MethodRequest` | `EdgeModuleDirectMethodRequest` | Use full name of the operation type. Inherit from `DirectMethodRequest` to create a type that is meant to be used only with `IotHubModuleClient`. |

#### Notable additions

Expand Down
8 changes: 4 additions & 4 deletions e2e/LongHaul/module/IotHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public async Task InitializeAsync(CancellationToken? ct = null)
_logger.Metric(ModuleClientOpenDelaySeconds, sw.Elapsed.TotalSeconds);
await _moduleClient.SetDirectMethodCallbackAsync(DirectMethodCallback).ConfigureAwait(false);
await _moduleClient.SetDesiredPropertyUpdateCallbackAsync(DesiredPropertyUpdateCallbackAsync).ConfigureAwait(false);
await _moduleClient.SetIncomingMessageCallbackAsync(OnM2mMessageReceivedAsync).ConfigureAwait(false);
await _moduleClient.SetIncomingMessageCallbackAsync(OnIncomingMessageReceivedAsync).ConfigureAwait(false);
}
finally
{
Expand All @@ -103,7 +103,7 @@ public async Task InitializeAsync(CancellationToken? ct = null)
/// Frequently send telemetry messages to the hub.
/// </summary>
/// <param name="ct">The cancellation token</param>
public async Task SendTelemetryMessagesAsync(Logger logger, CancellationToken ct)
public async Task SendMessagesToRouteAsync(Logger logger, CancellationToken ct)
{
// AMQP supports bulk telemetry sending, so we'll configure how many to send at a time.
int maxBulkMessages = _clientOptions.TransportSettings is IotHubClientAmqpSettings
Expand All @@ -118,7 +118,7 @@ public async Task SendTelemetryMessagesAsync(Logger logger, CancellationToken ct
var sw = new Stopwatch();
while (!ct.IsCancellationRequested)
{
logger.Metric(ModuleMessageBacklog, _messagesToSend.Count);
logger.Metric(ModuleMessageToRouteBacklog, _messagesToSend.Count);

await Task.Delay(s_messageLoopSleepTime, ct).ConfigureAwait(false);

Expand Down Expand Up @@ -421,7 +421,7 @@ private async Task DesiredPropertyUpdateCallbackAsync(DesiredProperties properti
_logger.Metric(TotalTwinCallbacksToModuleHandled, _totalTwinCallbacksToModuleHandled);
}

private Task<MessageAcknowledgement> OnM2mMessageReceivedAsync(IncomingMessage receivedMessage)
private Task<MessageAcknowledgement> OnIncomingMessageReceivedAsync(IncomingMessage receivedMessage)
{
_logger.Trace($"Received the M2M message with Id {receivedMessage.MessageId}", TraceSeverity.Information);

Expand Down
2 changes: 1 addition & 1 deletion e2e/LongHaul/module/LoggingConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ internal static class LoggingConstants
public const string ModuleDisconnectedDurationSeconds = "ModuleDisconnectedDurationSeconds";
public const string TotalTelemetryMessagesToModuleSent = "TotalTelemetryMessagesToModuleSent";
public const string TelemetryMessageToModuleDelaySeconds = "TelemetryMessageToModuleDelaySeconds";
public const string ModuleMessageBacklog = "ModuleMessageBacklog";
public const string ModuleMessageToRouteBacklog = "ModuleMessageToRouteBacklog";
public const string C2mDirectMethodDelaySeconds = "C2mDirectMethodDelaySeconds";
public const string TotalTwinUpdatesToModuleReported = "TotalTwinUpdatesToModuleReported";
public const string TotalTwinCallbacksToModuleHandled = "TotalTwinCallbacksToModuleHandled";
Expand Down
2 changes: 1 addition & 1 deletion e2e/LongHaul/module/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private static async Task Main(string[] args)
await Task
.WhenAll(
systemHealthMonitor.RunAsync(cts.Token),
iotHub.SendTelemetryMessagesAsync(s_logger.Clone(), cts.Token),
iotHub.SendMessagesToRouteAsync(s_logger.Clone(), cts.Token),
iotHub.ReportReadOnlyPropertiesAsync(s_logger.Clone(), cts.Token))
.ConfigureAwait(false);
}
Expand Down
3 changes: 2 additions & 1 deletion e2e/LongHaul/prerequisites/LongHaulSetup.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ param(
$startTime = (Get-Date)

########################################################################################################
# Set error and warning preferences for the script to run.
# Set error behavior preference for the Powershell operating environment.
# For additional reading, see https://learn.microsoft.com/powershell/module/microsoft.powershell.core/about/about_preference_variables.
########################################################################################################

$ErrorActionPreference = "Stop"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ namespace Microsoft.Azure.Devices.Client.Samples
public class EdgeModuleMethodSample
{
private readonly IotHubModuleClient _moduleClient;
private string _deviceId;
private string _moduleId;
private readonly string _deviceId;
private readonly string _moduleId;
private readonly TimeSpan? _maxRunTime;

public EdgeModuleMethodSample(IotHubModuleClient moduleClient, string deviceId, string moduleId, TimeSpan? maxRunTime)
Expand Down Expand Up @@ -54,7 +54,7 @@ public async Task RunSampleAsync()
}

// Invoking a direct method request to the module itself.
var directMethodRequest = new DirectMethodRequest("ModuleToModule");
var directMethodRequest = new EdgeModuleDirectMethodRequest("ModuleToModule");
await _moduleClient.InvokeMethodAsync(_deviceId, _moduleId, directMethodRequest, cts.Token);

// You can unsubscribe from receiving a callback for direct methods by setting a null callback handler.
Expand Down
10 changes: 5 additions & 5 deletions iothub/device/src/DirectMethod/DirectMethodRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ internal DirectMethodRequest()
/// <summary>
/// Initialize an instance of this class.
/// </summary>
/// <param name="methodName">The method name to invoke.</param>
/// <remarks>
/// A direct method request can only be made by the service or a module;
/// a device client app will not need to instantiate this class.
/// This class can be inherited from and set by unit tests for mocking purposes.
/// </remarks>
public DirectMethodRequest(string methodName)
/// <param name="methodName">The method name to invoke.</param>
protected internal DirectMethodRequest(string methodName)
{
MethodName = methodName;
}
Expand Down Expand Up @@ -84,6 +83,7 @@ public DirectMethodRequest(string methodName)
/// <summary>
/// The direct method payload.
/// </summary>
[JsonProperty("payload", NullValueHandling = NullValueHandling.Include)]
protected internal byte[] Payload { get; set; }

/// <summary>
Expand Down Expand Up @@ -186,7 +186,7 @@ public bool TryGetPayload<T>(out T payload)
/// </example>
public byte[] GetPayloadAsBytes()
{
return Payload == null || Payload.Length==0 ? null : (byte[])Payload.Clone();
return Payload == null || Payload.Length == 0 ? null : (byte[])Payload.Clone();
}
}
}
47 changes: 47 additions & 0 deletions iothub/device/src/DirectMethod/EdgeModuleDirectMethodRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.Azure.Devices.Client
{
/// <summary>
/// Parameters to execute a direct method on an edge device or an edge module by an <see cref="IotHubModuleClient"/>.
/// </summary>
public class EdgeModuleDirectMethodRequest : DirectMethodRequest
{
/// <summary>
/// A direct method request to be initialized by the client application when using an <see cref="IotHubModuleClient"/> for invoking
/// a direct method on an edge device or an edge module connected to the same edge hub.
/// </summary>
/// <param name="methodName">The method name to invoke.</param>
public EdgeModuleDirectMethodRequest(string methodName)
: this(methodName, null)
{
}

/// <summary>
/// A direct method request to be initialized by the client application when using an <see cref="IotHubModuleClient"/> for invoking
/// a direct method on an edge device or an edge module connected to the same edge hub.
/// </summary>
/// <param name="methodName">The method name to invoke.</param>
/// <param name="payload">The direct method payload bytes.</param>
public EdgeModuleDirectMethodRequest(string methodName, byte[] payload)
: base(methodName)
{
Payload = payload;
}

/// <summary>
/// A direct method request to be initialized by the client application when using an <see cref="IotHubModuleClient"/> for invoking
/// a direct method on an edge device or an edge module connected to the same edge hub.
/// </summary>
/// <param name="methodName">The method name to invoke.</param>
/// <param name="payload">The direct method payload that will be serialized and encoded per <see cref="IotHubClientOptions.PayloadConvention"/>.</param>
public EdgeModuleDirectMethodRequest(string methodName, object payload)
: base(methodName)
{
Payload = payload == null
? null
: PayloadConvention.GetObjectBytes(payload);
}
}
}
2 changes: 1 addition & 1 deletion iothub/device/src/IotHubClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public IotHubClientOptions(IotHubClientTransportSettings transportSettings)
/// <remarks>
/// All <see cref="IotHubDeviceClient"/> file upload operations take place over HTTP regardless of the configured protocol.
/// Additionally, all <see cref="IotHubModuleClient"/> direct method invoking operations (such as
/// <see cref="IotHubModuleClient.InvokeMethodAsync(string, DirectMethodRequest, System.Threading.CancellationToken)"/>)
/// <see cref="IotHubModuleClient.InvokeMethodAsync(string, EdgeModuleDirectMethodRequest, System.Threading.CancellationToken)"/>)
/// take place over HTTP as well. The settings provided in this class will be used for all these operations.
/// </remarks>
public IotHubClientHttpSettings HttpOperationTransportSettings { get; set; } = new IotHubClientHttpSettings();
Expand Down
10 changes: 5 additions & 5 deletions iothub/device/src/IotHubModuleClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,10 @@ public async Task SendMessagesToRouteAsync(string outputName, IEnumerable<Teleme
/// <exception cref="ObjectDisposedException">The client has been disposed.</exception>
/// <example>
/// <code language="csharp">
/// DirectMethodResponse response = await client.InvokeMethodAsync(deviceId, new DirectMethodRequest(methodName), cancellationToken);
/// DirectMethodResponse response = await client.InvokeMethodAsync(deviceId, new EdgeModuleDirectMethodRequest(methodName), cancellationToken);
/// </code>
/// </example>
public Task<DirectMethodResponse> InvokeMethodAsync(string deviceId, DirectMethodRequest methodRequest, CancellationToken cancellationToken = default)
public Task<DirectMethodResponse> InvokeMethodAsync(string deviceId, EdgeModuleDirectMethodRequest methodRequest, CancellationToken cancellationToken = default)
{
Argument.AssertNotNullOrWhiteSpace(deviceId, nameof(deviceId));
Argument.AssertNotNull(methodRequest, nameof(methodRequest));
Expand Down Expand Up @@ -275,10 +275,10 @@ public Task<DirectMethodResponse> InvokeMethodAsync(string deviceId, DirectMetho
/// <exception cref="ObjectDisposedException">The client has been disposed.</exception>
/// <example>
/// <code language="csharp">
/// DirectMethodResponse response = await client.InvokeMethodAsync(deviceId, moduleId, new DirectMethodRequest(methodName), cancellationToken);
/// DirectMethodResponse response = await client.InvokeMethodAsync(deviceId, moduleId, new EdgeModuleDirectMethodRequest(methodName), cancellationToken);
/// </code>
/// </example>
public Task<DirectMethodResponse> InvokeMethodAsync(string deviceId, string moduleId, DirectMethodRequest methodRequest, CancellationToken cancellationToken = default)
public Task<DirectMethodResponse> InvokeMethodAsync(string deviceId, string moduleId, EdgeModuleDirectMethodRequest methodRequest, CancellationToken cancellationToken = default)
{
Argument.AssertNotNullOrWhiteSpace(deviceId, nameof(deviceId));
Argument.AssertNotNullOrWhiteSpace(moduleId, nameof(moduleId));
Expand All @@ -288,7 +288,7 @@ public Task<DirectMethodResponse> InvokeMethodAsync(string deviceId, string modu
return InvokeMethodAsync(GetModuleMethodUri(deviceId, moduleId), methodRequest, cancellationToken);
}

private async Task<DirectMethodResponse> InvokeMethodAsync(Uri uri, DirectMethodRequest methodRequest, CancellationToken cancellationToken = default)
private async Task<DirectMethodResponse> InvokeMethodAsync(Uri uri, EdgeModuleDirectMethodRequest methodRequest, CancellationToken cancellationToken = default)
{
methodRequest.PayloadConvention = _clientOptions.PayloadConvention;
DirectMethodResponse result = await InnerHandler.InvokeMethodAsync(methodRequest, uri, cancellationToken).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ await ValidateStateAndPerformOperationAsync(
}
}

public override async Task<DirectMethodResponse> InvokeMethodAsync(DirectMethodRequest methodInvokeRequest, Uri uri, CancellationToken cancellationToken)
public override async Task<DirectMethodResponse> InvokeMethodAsync(EdgeModuleDirectMethodRequest methodInvokeRequest, Uri uri, CancellationToken cancellationToken)
{
if (Logging.IsEnabled)
Logging.Enter(this, methodInvokeRequest.RequestId, uri, cancellationToken, nameof(InvokeMethodAsync));
Expand Down
2 changes: 1 addition & 1 deletion iothub/device/src/Pipeline/DefaultDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public virtual Task CompleteFileUploadAsync(FileUploadCompletionNotification not
return NextHandler?.CompleteFileUploadAsync(notification, cancellationToken) ?? Task.CompletedTask;
}

public virtual Task<DirectMethodResponse> InvokeMethodAsync(DirectMethodRequest methodInvokeRequest, Uri uri, CancellationToken cancellationToken)
public virtual Task<DirectMethodResponse> InvokeMethodAsync(EdgeModuleDirectMethodRequest methodInvokeRequest, Uri uri, CancellationToken cancellationToken)
{
ThrowIfDisposed();
return NextHandler?.InvokeMethodAsync(methodInvokeRequest, uri, cancellationToken) ?? Task.FromResult<DirectMethodResponse>(null);
Expand Down
2 changes: 1 addition & 1 deletion iothub/device/src/Pipeline/ExceptionRemappingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public override Task CompleteFileUploadAsync(FileUploadCompletionNotification no
return ExecuteWithExceptionRemappingAsync(() => base.CompleteFileUploadAsync(notification, cancellationToken));
}

public override Task<DirectMethodResponse> InvokeMethodAsync(DirectMethodRequest methodInvokeRequest, Uri uri, CancellationToken cancellationToken)
public override Task<DirectMethodResponse> InvokeMethodAsync(EdgeModuleDirectMethodRequest methodInvokeRequest, Uri uri, CancellationToken cancellationToken)
{
return RunWithExceptionRemappingAsync(() => base.InvokeMethodAsync(methodInvokeRequest, uri, cancellationToken));
}
Expand Down
2 changes: 1 addition & 1 deletion iothub/device/src/Pipeline/IDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ internal interface IDelegatingHandler : IContinuationProvider<IDelegatingHandler
Task CompleteFileUploadAsync(FileUploadCompletionNotification notification, CancellationToken cancellationToken);

// This is for invoking methods from an edge module to another edge device or edge module.
Task<DirectMethodResponse> InvokeMethodAsync(DirectMethodRequest methodInvokeRequest, Uri uri, CancellationToken cancellationToken);
Task<DirectMethodResponse> InvokeMethodAsync(EdgeModuleDirectMethodRequest methodInvokeRequest, Uri uri, CancellationToken cancellationToken);

// Sas token validity
Task<DateTime> RefreshSasTokenAsync(CancellationToken cancellationToken);
Expand Down
2 changes: 1 addition & 1 deletion iothub/device/src/Pipeline/RetryDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ await _internalRetryHandler
}
}

public override async Task<DirectMethodResponse> InvokeMethodAsync(DirectMethodRequest methodInvokeRequest, Uri uri, CancellationToken cancellationToken)
public override async Task<DirectMethodResponse> InvokeMethodAsync(EdgeModuleDirectMethodRequest methodInvokeRequest, Uri uri, CancellationToken cancellationToken)
{
if (Logging.IsEnabled)
Logging.Enter(this, methodInvokeRequest.RequestId, uri, cancellationToken, nameof(InvokeMethodAsync));
Expand Down
22 changes: 3 additions & 19 deletions iothub/device/src/Transport/Http/HttpClientHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,20 +157,9 @@ public async Task<T2> PostAsync<T1, T2>(
AddCustomHeaders(msg, customHeaders);
if (entity != null)
{
if (typeof(T1) == typeof(byte[]))
{
msg.Content = new ByteArrayContent((byte[])(object)entity);
}
else if (typeof(T1) == typeof(string))
{
// only used to send batched messages on Http runtime
msg.Content = new StringContent((string)(object)entity);
msg.Content.Headers.ContentType = new MediaTypeHeaderValue(CommonConstants.BatchedMessageContentType);
}
else
{
msg.Content = CreateContent(entity);
}
// Set the complete entity object into the HttpRequestMessage content. This includes the user-defined payload (if applicable)
// and all associated metadata. The content is set as per service-defined contract, i.e. UTF-8 encoded json string.
msg.Content = new StringContent(JsonConvert.SerializeObject(entity), Encoding.UTF8, "application/json");
}

HttpResponseMessage responseMsg;
Expand Down Expand Up @@ -246,11 +235,6 @@ private static async Task<Exception> MapToExceptionAsync(
return await exception.ConfigureAwait(false);
}

private static StringContent CreateContent<T>(T entity)
{
return new StringContent(JsonConvert.SerializeObject(entity), Encoding.UTF8, "application/json");
}

private static async Task<T> ReadAsAsync<T>(HttpContent content, CancellationToken token)
{
token.ThrowIfCancellationRequested();
Expand Down
4 changes: 2 additions & 2 deletions iothub/device/src/Transport/Http/HttpTransportHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ await _httpClientHelper

// This is for invoking methods from an edge module to another edge device or edge module.
public override async Task<DirectMethodResponse> InvokeMethodAsync(
DirectMethodRequest methodInvokeRequest,
EdgeModuleDirectMethodRequest methodInvokeRequest,
Uri uri,
CancellationToken cancellationToken)
{
Expand All @@ -89,7 +89,7 @@ public override async Task<DirectMethodResponse> InvokeMethodAsync(
};

return await _httpClientHelper
.PostAsync<DirectMethodRequest, DirectMethodResponse>(
.PostAsync<EdgeModuleDirectMethodRequest, DirectMethodResponse>(
uri,
methodInvokeRequest,
customHeaders,
Expand Down
Loading

0 comments on commit d6a4c3c

Please sign in to comment.