Skip to content

Commit

Permalink
Added ResponseContent and helper methods events.
Browse files Browse the repository at this point in the history
  • Loading branch information
ManickaP committed Aug 24, 2020
1 parent 3f00240 commit ec189d4
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 94 deletions.
300 changes: 206 additions & 94 deletions src/libraries/System.Net.Http/src/System/Net/Http/HttpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,50 +164,81 @@ public Task<string> GetStringAsync(string? requestUri, CancellationToken cancell
GetStringAsync(CreateUri(requestUri), cancellationToken);

public Task<string> GetStringAsync(Uri? requestUri, CancellationToken cancellationToken) =>
GetStringAsyncCore(GetAsync(requestUri, HttpCompletionOption.ResponseHeadersRead, cancellationToken), cancellationToken);
GetStringAsyncCore(requestUri, cancellationToken);

private async Task<string> GetStringAsyncCore(Task<HttpResponseMessage> getTask, CancellationToken cancellationToken)
private async Task<string> GetStringAsyncCore(Uri? requestUri, CancellationToken cancellationToken)
{
// Wait for the response message.
using (HttpResponseMessage responseMessage = await getTask.ConfigureAwait(false))
bool telemetryStarted = false;
bool responseTelemetryStarted = false;

if (HttpTelemetry.Log.IsEnabled())
{
// Make sure it completed successfully.
responseMessage.EnsureSuccessStatusCode();
HttpTelemetry.Log.GetHelperStart(nameof(GetStringAsync));
telemetryStarted = true;
}

// Get the response content.
HttpContent? c = responseMessage.Content;
if (c != null)
try
{
// Wait for the response message.
using (HttpResponseMessage responseMessage = await GetAsync(requestUri, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false))
{
#if NET46
return await c.ReadAsStringAsync().ConfigureAwait(false);
#else
HttpContentHeaders headers = c.Headers;

// Since the underlying byte[] will never be exposed, we use an ArrayPool-backed
// stream to which we copy all of the data from the response.
using (Stream responseStream = c.TryReadAsStream() ?? await c.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false))
using (var buffer = new HttpContent.LimitArrayPoolWriteStream(_maxResponseContentBufferSize, (int)headers.ContentLength.GetValueOrDefault()))
// Make sure it completed successfully.
responseMessage.EnsureSuccessStatusCode();

// Get the response content.
HttpContent? c = responseMessage.Content;
if (c != null)
{
try
if (HttpTelemetry.Log.IsEnabled())
{
await responseStream.CopyToAsync(buffer, cancellationToken).ConfigureAwait(false);
HttpTelemetry.Log.ResponseContentStart();
responseTelemetryStarted = true;
}
catch (Exception e) when (HttpContent.StreamCopyExceptionNeedsWrapping(e))
#if NET46
return await c.ReadAsStringAsync().ConfigureAwait(false);
#else
HttpContentHeaders headers = c.Headers;

// Since the underlying byte[] will never be exposed, we use an ArrayPool-backed
// stream to which we copy all of the data from the response.
using (Stream responseStream = c.TryReadAsStream() ?? await c.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false))
using (var buffer = new HttpContent.LimitArrayPoolWriteStream(_maxResponseContentBufferSize, (int)headers.ContentLength.GetValueOrDefault()))
{
throw HttpContent.WrapStreamCopyException(e);
}
try
{
await responseStream.CopyToAsync(buffer, cancellationToken).ConfigureAwait(false);
}
catch (Exception e) when (HttpContent.StreamCopyExceptionNeedsWrapping(e))
{
throw HttpContent.WrapStreamCopyException(e);
}

if (buffer.Length > 0)
{
// Decode and return the data from the buffer.
return HttpContent.ReadBufferAsString(buffer.GetBuffer(), headers);
if (buffer.Length > 0)
{
// Decode and return the data from the buffer.
return HttpContent.ReadBufferAsString(buffer.GetBuffer(), headers);
}
}
#endif
}
#endif
}

// No content to return.
return string.Empty;
// No content to return.
return string.Empty;
}
}
finally
{
if (HttpTelemetry.Log.IsEnabled())
{
if (responseTelemetryStarted)
{
HttpTelemetry.Log.ResponseContentStop();
}
if (telemetryStarted)
{
HttpTelemetry.Log.GetHelperStop();
}
}
}
}

Expand All @@ -221,59 +252,52 @@ public Task<byte[]> GetByteArrayAsync(string? requestUri, CancellationToken canc
GetByteArrayAsync(CreateUri(requestUri), cancellationToken);

public Task<byte[]> GetByteArrayAsync(Uri? requestUri, CancellationToken cancellationToken) =>
GetByteArrayAsyncCore(GetAsync(requestUri, HttpCompletionOption.ResponseHeadersRead, cancellationToken), cancellationToken);
GetByteArrayAsyncCore(requestUri, cancellationToken);

private async Task<byte[]> GetByteArrayAsyncCore(Task<HttpResponseMessage> getTask, CancellationToken cancellationToken)
private async Task<byte[]> GetByteArrayAsyncCore(Uri? requestUri, CancellationToken cancellationToken)
{
// Wait for the response message.
using (HttpResponseMessage responseMessage = await getTask.ConfigureAwait(false))
bool telemetryStarted = false;
bool responseTelemetryStarted = false;

if (HttpTelemetry.Log.IsEnabled())
{
// Make sure it completed successfully.
responseMessage.EnsureSuccessStatusCode();
HttpTelemetry.Log.GetHelperStart(nameof(GetByteArrayAsync));
telemetryStarted = true;
}

// Get the response content.
HttpContent? c = responseMessage.Content;
if (c != null)
try
{
// Wait for the response message.
using (HttpResponseMessage responseMessage = await GetAsync(requestUri, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false))
{
#if NET46
return await c.ReadAsByteArrayAsync().ConfigureAwait(false);
#else
HttpContentHeaders headers = c.Headers;
using (Stream responseStream = c.TryReadAsStream() ?? await c.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false))
{
long? contentLength = headers.ContentLength;
Stream buffer; // declared here to share the state machine field across both if/else branches
// Make sure it completed successfully.
responseMessage.EnsureSuccessStatusCode();

if (contentLength.HasValue)
// Get the response content.
HttpContent? c = responseMessage.Content;
if (c != null)
{
if (HttpTelemetry.Log.IsEnabled())
{
// If we got a content length, then we assume that it's correct and create a MemoryStream
// to which the content will be transferred. That way, assuming we actually get the exact
// amount we were expecting, we can simply return the MemoryStream's underlying buffer.
buffer = new HttpContent.LimitMemoryStream(_maxResponseContentBufferSize, (int)contentLength.GetValueOrDefault());

try
{
await responseStream.CopyToAsync(buffer, cancellationToken).ConfigureAwait(false);
}
catch (Exception e) when (HttpContent.StreamCopyExceptionNeedsWrapping(e))
{
throw HttpContent.WrapStreamCopyException(e);
}

if (buffer.Length > 0)
{
return ((HttpContent.LimitMemoryStream)buffer).GetSizedBuffer();
}
HttpTelemetry.Log.ResponseContentStart();
responseTelemetryStarted = true;
}
else
#if NET46
return await c.ReadAsByteArrayAsync().ConfigureAwait(false);
#else
HttpContentHeaders headers = c.Headers;
using (Stream responseStream = c.TryReadAsStream() ?? await c.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false))
{
// If we didn't get a content length, then we assume we're going to have to grow
// the buffer potentially several times and that it's unlikely the underlying buffer
// at the end will be the exact size needed, in which case it's more beneficial to use
// ArrayPool buffers and copy out to a new array at the end.
buffer = new HttpContent.LimitArrayPoolWriteStream(_maxResponseContentBufferSize);
try
long? contentLength = headers.ContentLength;
Stream buffer; // declared here to share the state machine field across both if/else branches

if (contentLength.HasValue)
{
// If we got a content length, then we assume that it's correct and create a MemoryStream
// to which the content will be transferred. That way, assuming we actually get the exact
// amount we were expecting, we can simply return the MemoryStream's underlying buffer.
buffer = new HttpContent.LimitMemoryStream(_maxResponseContentBufferSize, (int)contentLength.GetValueOrDefault());

try
{
await responseStream.CopyToAsync(buffer, cancellationToken).ConfigureAwait(false);
Expand All @@ -285,17 +309,55 @@ private async Task<byte[]> GetByteArrayAsyncCore(Task<HttpResponseMessage> getTa

if (buffer.Length > 0)
{
return ((HttpContent.LimitArrayPoolWriteStream)buffer).ToArray();
return ((HttpContent.LimitMemoryStream)buffer).GetSizedBuffer();
}
}
else
{
// If we didn't get a content length, then we assume we're going to have to grow
// the buffer potentially several times and that it's unlikely the underlying buffer
// at the end will be the exact size needed, in which case it's more beneficial to use
// ArrayPool buffers and copy out to a new array at the end.
buffer = new HttpContent.LimitArrayPoolWriteStream(_maxResponseContentBufferSize);
try
{
try
{
await responseStream.CopyToAsync(buffer, cancellationToken).ConfigureAwait(false);
}
catch (Exception e) when (HttpContent.StreamCopyExceptionNeedsWrapping(e))
{
throw HttpContent.WrapStreamCopyException(e);
}

if (buffer.Length > 0)
{
return ((HttpContent.LimitArrayPoolWriteStream)buffer).ToArray();
}
}
finally { buffer.Dispose(); }
}
finally { buffer.Dispose(); }
}
#endif
}
#endif
}

// No content to return.
return Array.Empty<byte>();
// No content to return.
return Array.Empty<byte>();
}
}
finally
{
if (HttpTelemetry.Log.IsEnabled())
{
if (responseTelemetryStarted)
{
HttpTelemetry.Log.ResponseContentStop();
}
if (telemetryStarted)
{
HttpTelemetry.Log.GetHelperStop();
}
}
}
}

Expand All @@ -309,16 +371,50 @@ public Task<Stream> GetStreamAsync(Uri? requestUri) =>
GetStreamAsync(requestUri, CancellationToken.None);

public Task<Stream> GetStreamAsync(Uri? requestUri, CancellationToken cancellationToken) =>
FinishGetStreamAsync(GetAsync(requestUri, HttpCompletionOption.ResponseHeadersRead, cancellationToken), cancellationToken);
FinishGetStreamAsync(requestUri, cancellationToken);

private async Task<Stream> FinishGetStreamAsync(Task<HttpResponseMessage> getTask, CancellationToken cancellationToken)
private async Task<Stream> FinishGetStreamAsync(Uri? requestUri, CancellationToken cancellationToken)
{
HttpResponseMessage response = await getTask.ConfigureAwait(false);
response.EnsureSuccessStatusCode();
HttpContent? c = response.Content;
return c != null ?
(c.TryReadAsStream() ?? await c.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false)) :
Stream.Null;
bool telemetryStarted = false;
bool responseTelemetryStarted = false;

if (HttpTelemetry.Log.IsEnabled())
{
HttpTelemetry.Log.GetHelperStart(nameof(GetStreamAsync));
telemetryStarted = true;
}

try
{
HttpResponseMessage response = await GetAsync(requestUri, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
response.EnsureSuccessStatusCode();
HttpContent? c = response.Content;
if (c != null)
{
if (HttpTelemetry.Log.IsEnabled())
{
HttpTelemetry.Log.ResponseContentStart();
responseTelemetryStarted = true;
}

return c.TryReadAsStream() ?? await c.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
}
return Stream.Null;
}
finally
{
if (HttpTelemetry.Log.IsEnabled())
{
if (responseTelemetryStarted)
{
HttpTelemetry.Log.ResponseContentStop();
}
if (telemetryStarted)
{
HttpTelemetry.Log.GetHelperStop();
}
}
}
}

#endregion Simple Get Overloads
Expand Down Expand Up @@ -542,8 +638,7 @@ private async ValueTask<HttpResponseMessage> SendAsyncCore(HttpRequestMessage re
bool telemetryStarted = false;
if (HttpTelemetry.Log.IsEnabled())
{
// TODO https://github.com/dotnet/runtime/issues/40896: Enable tracing for HTTP/3.
if (request.Version.Major < 3 && request.RequestUri != null)
if (request.RequestUri != null)
{
HttpTelemetry.Log.RequestStart(
request.RequestUri.Scheme,
Expand Down Expand Up @@ -572,14 +667,31 @@ await base.SendAsync(request, cts.Token).ConfigureAwait(false) :
// Buffer the response content if we've been asked to and we have a Content to buffer.
if (buffered && response.Content != null)
{
if (async)
bool responseTelemetryStarted = false;
if (HttpTelemetry.Log.IsEnabled())
{
await response.Content.LoadIntoBufferAsync(_maxResponseContentBufferSize, cts.Token).ConfigureAwait(false);
HttpTelemetry.Log.ResponseContentStart();
responseTelemetryStarted = true;
}

try
{
if (async)
{
await response.Content.LoadIntoBufferAsync(_maxResponseContentBufferSize, cts.Token).ConfigureAwait(false);

}
else
{
response.Content.LoadIntoBuffer(_maxResponseContentBufferSize, cts.Token);
}
}
else
finally
{
response.Content.LoadIntoBuffer(_maxResponseContentBufferSize, cts.Token);
if (HttpTelemetry.Log.IsEnabled() && responseTelemetryStarted)
{
HttpTelemetry.Log.ResponseContentStop();
}
}
}

Expand Down
Loading

0 comments on commit ec189d4

Please sign in to comment.