Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prepare streaming in management sdk. #2129

Merged
merged 7 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
// Copyright (c) Microsoft. All rights reserved.
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Net.Http;

using Microsoft.AspNetCore.SignalR.Protocol;

#nullable enable

namespace Microsoft.Azure.SignalR.Common
namespace Microsoft.Azure.SignalR.Common;

internal class BinaryPayloadContentBuilder : IPayloadContentBuilder
{
internal class BinaryPayloadContentBuilder : IPayloadContentBuilder
private readonly IReadOnlyList<IHubProtocol> _hubProtocols;

public BinaryPayloadContentBuilder(IReadOnlyList<IHubProtocol> hubProtocols)
{
private readonly IReadOnlyList<IHubProtocol> _hubProtocols;
public BinaryPayloadContentBuilder(IReadOnlyList<IHubProtocol> hubProtocols)
{
_hubProtocols = hubProtocols;
}
_hubProtocols = hubProtocols;
}

public HttpContent? Build(PayloadMessage? payload)
{
return payload == null ? null : (HttpContent)new BinaryPayloadMessageContent(payload, _hubProtocols);
}
public HttpContent? Build(HubMessage? payload, Type? typeHint)
{
return payload == null ? null : (HttpContent)new BinaryPayloadMessageContent(payload, _hubProtocols);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) Microsoft. All rights reserved.
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
Expand All @@ -15,51 +15,50 @@

using Microsoft.AspNetCore.SignalR.Protocol;

namespace Microsoft.Azure.SignalR.Common
namespace Microsoft.Azure.SignalR.Common;

internal class BinaryPayloadMessageContent : HttpContent
{
internal class BinaryPayloadMessageContent : HttpContent
private static readonly Dictionary<string, byte[]> ProtocolMap = new(2)
{
private static readonly Dictionary<string, byte[]> ProtocolMap = new Dictionary<string, byte[]>(2)
{
{Constants.Protocol.Json, Encoding.UTF8.GetBytes(Constants.Protocol.Json) },
{Constants.Protocol.MessagePack,Encoding.UTF8.GetBytes(Constants.Protocol.MessagePack)}
};
private static readonly MediaTypeHeaderValue ContentType = new("application/octet-stream");
{Constants.Protocol.Json, Encoding.UTF8.GetBytes(Constants.Protocol.Json) },
{Constants.Protocol.MessagePack,Encoding.UTF8.GetBytes(Constants.Protocol.MessagePack)}
};

private readonly PayloadMessage _payloadMessage;
private readonly IReadOnlyList<IHubProtocol> _hubProtocols;
private static readonly MediaTypeHeaderValue ContentType = new("application/octet-stream");

public BinaryPayloadMessageContent(PayloadMessage payloadMessage, IReadOnlyList<IHubProtocol> hubProtocols)
{
_payloadMessage = payloadMessage ?? throw new ArgumentNullException(nameof(payloadMessage));
_hubProtocols = hubProtocols ?? throw new ArgumentNullException(nameof(hubProtocols));
Headers.ContentType = ContentType;
}
private readonly HubMessage _payloadMessage;
private readonly IReadOnlyList<IHubProtocol> _hubProtocols;

protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
using var memoryBufferWriter = new MemoryBufferWriter();
WriteMessageCore(memoryBufferWriter);
await memoryBufferWriter.CopyToAsync(stream);
}
public BinaryPayloadMessageContent(HubMessage payloadMessage, IReadOnlyList<IHubProtocol> hubProtocols)
{
_payloadMessage = payloadMessage ?? throw new ArgumentNullException(nameof(payloadMessage));
_hubProtocols = hubProtocols ?? throw new ArgumentNullException(nameof(hubProtocols));
Headers.ContentType = ContentType;
}

protected override bool TryComputeLength(out long length)
{
length = 0;
return false;
}
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
using var memoryBufferWriter = new MemoryBufferWriter();
WriteMessageCore(memoryBufferWriter);
await memoryBufferWriter.CopyToAsync(stream);
}

private void WriteMessageCore(IBufferWriter<byte> bufferWriter)
protected override bool TryComputeLength(out long length)
{
length = 0;
return false;
}

private void WriteMessageCore(IBufferWriter<byte> bufferWriter)
{
var messagePackWriter = new MessagePackWriter(bufferWriter);
messagePackWriter.WriteMapHeader(_hubProtocols.Count);
foreach (var hubProtocol in _hubProtocols)
{
var invocationMessage = new InvocationMessage(_payloadMessage.Target, _payloadMessage.Arguments);
var messagePackWriter = new MessagePackWriter(bufferWriter);
messagePackWriter.WriteMapHeader(_hubProtocols.Count);
foreach (var hubProtocol in _hubProtocols)
{
messagePackWriter.WriteString(ProtocolMap[hubProtocol.Name]);
messagePackWriter.Write(hubProtocol.GetMessageBytes(invocationMessage).Span);
}
messagePackWriter.Flush();
messagePackWriter.WriteString(ProtocolMap[hubProtocol.Name]);
messagePackWriter.Write(hubProtocol.GetMessageBytes(_payloadMessage).Span);
}
messagePackWriter.Flush();
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
// Copyright (c) Microsoft. All rights reserved.
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Net.Http;

using Microsoft.AspNetCore.SignalR.Protocol;

#nullable enable

namespace Microsoft.Azure.SignalR.Common
namespace Microsoft.Azure.SignalR.Common;

internal interface IPayloadContentBuilder
{
internal interface IPayloadContentBuilder
{
HttpContent? Build(PayloadMessage? payload);
}
HttpContent? Build(HubMessage? payload, Type? typeHint);
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
// Copyright (c) Microsoft. All rights reserved.
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Net.Http;

using Azure.Core.Serialization;

using Microsoft.AspNetCore.SignalR.Protocol;

#nullable enable
namespace Microsoft.Azure.SignalR.Common
namespace Microsoft.Azure.SignalR.Common;

internal class JsonPayloadContentBuilder : IPayloadContentBuilder
{
internal class JsonPayloadContentBuilder : IPayloadContentBuilder
{
private readonly ObjectSerializer _jsonObjectSerializer;
private readonly ObjectSerializer _jsonObjectSerializer;

public JsonPayloadContentBuilder(ObjectSerializer jsonObjectSerializer)
{
_jsonObjectSerializer = jsonObjectSerializer;
}
public JsonPayloadContentBuilder(ObjectSerializer jsonObjectSerializer)
{
_jsonObjectSerializer = jsonObjectSerializer;
}

public HttpContent? Build(PayloadMessage? payload)
{
return payload == null ? null : new JsonPayloadMessageContent(payload, _jsonObjectSerializer);
}
public HttpContent? Build(HubMessage? payload, Type? typeHint)
{
return payload == null ? null : new JsonPayloadMessageContent(payload, _jsonObjectSerializer, typeHint);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.IO;
using System.Net;
using System.Net.Http;
Expand All @@ -10,45 +11,55 @@

using Azure.Core.Serialization;

namespace Microsoft.Azure.SignalR
using Microsoft.AspNetCore.SignalR.Protocol;

namespace Microsoft.Azure.SignalR;

internal class JsonPayloadMessageContent : HttpContent
{
internal class JsonPayloadMessageContent : HttpContent
private static readonly MediaTypeHeaderValue ContentType = new("application/json")
{
private static readonly MediaTypeHeaderValue ContentType = new("application/json")
{
CharSet = "utf-8"
};
private static readonly JsonWriterOptions JsonWriterOptions = new()
{
// We must skip validation because what we break the writing midway and write JSON in other ways.
SkipValidation = true
};
private readonly PayloadMessage _payloadMessage;
private readonly ObjectSerializer _jsonObjectSerializer;
CharSet = "utf-8"
};
private static readonly JsonWriterOptions JsonWriterOptions = new()
{
// We must skip validation because what we break the writing midway and write JSON in other ways.
SkipValidation = true
};
private readonly HubMessage _payloadMessage;
private readonly ObjectSerializer _jsonObjectSerializer;
private readonly Type _typeHint;

public JsonPayloadMessageContent(PayloadMessage payloadMessage, ObjectSerializer jsonObjectSerializer)
{
_payloadMessage = payloadMessage ?? throw new System.ArgumentNullException(nameof(payloadMessage));
_jsonObjectSerializer = jsonObjectSerializer;
Headers.ContentType = ContentType;
}
public JsonPayloadMessageContent(HubMessage payloadMessage, ObjectSerializer jsonObjectSerializer, Type typeHint)
{
_payloadMessage = payloadMessage ?? throw new System.ArgumentNullException(nameof(payloadMessage));
_jsonObjectSerializer = jsonObjectSerializer;
_typeHint = typeHint;
Headers.ContentType = ContentType;
}

protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
if (_payloadMessage is InvocationMessage invocationMessage)
{
using var jsonWriter = new Utf8JsonWriter(stream, JsonWriterOptions);
jsonWriter.WriteStartObject();
jsonWriter.WriteString(nameof(PayloadMessage.Target), _payloadMessage.Target);
jsonWriter.WriteString(nameof(PayloadMessage.Target), invocationMessage.Target);
jsonWriter.WritePropertyName(nameof(PayloadMessage.Arguments));
await jsonWriter.FlushAsync();
await _jsonObjectSerializer.SerializeAsync(stream, _payloadMessage.Arguments, typeof(object[]), default);
await _jsonObjectSerializer.SerializeAsync(stream, invocationMessage.Arguments, typeof(object[]), default);
jsonWriter.WriteEndObject();
await jsonWriter.FlushAsync();
}

protected override bool TryComputeLength(out long length)
else if (_payloadMessage is StreamItemMessage streamItemMessage)
{
length = 0;
return false;
await _jsonObjectSerializer.SerializeAsync(stream, streamItemMessage.Item, _typeHint, default);
}
}

protected override bool TryComputeLength(out long length)
{
length = 0;
return false;
}
}
Loading