-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
Copy pathLazyLoadingReadOnlyStream.cs
483 lines (403 loc) · 17.7 KB
/
LazyLoadingReadOnlyStream.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
using System;
using System.Buffers;
using System.Globalization;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Core.Pipeline;
using Azure.Storage.Shared;
namespace Azure.Storage
{
/// <summary>
/// Used for Open Read APIs.
/// </summary>
internal class LazyLoadingReadOnlyStream<TProperties> : Stream
{
/// <summary>
/// Delegate for a resource's direct REST download method.
/// </summary>
/// <param name="range">
/// Content range to download.
/// </param>
/// <param name="transferValidation">
/// Optional validation options.
/// </param>
/// <param name="async">
/// Whether to perform the operation asynchronously.
/// </param>
/// <param name="cancellationToken">
/// Cancellation token for cancelling the download request.
/// </param>
/// <returns>
/// Downloaded resource content.
/// </returns>
public delegate Task<Response<IDownloadedContent>> DownloadInternalAsync(
HttpRange range,
DownloadTransferValidationOptions transferValidation,
bool async,
CancellationToken cancellationToken);
/// <summary>
/// Delegate for getting properties for the target resource.
/// </summary>
/// <param name="async">
/// Whether to perform the operation asynchronously.
/// </param>
/// <param name="cancellationToken">
/// Cancellation token for cancelling the download request.
/// </param>
/// <returns>
/// Resource properties.
/// </returns>
public delegate Task<Response<TProperties>> GetPropertiesAsync(bool async, CancellationToken cancellationToken);
/// <summary>
/// Delegate to replicate how a client will alter the download range.
/// Used to avoid requesting blob ranges that will result in error after transformation.
/// </summary>
/// <param name="range">Range this stream will request on download.</param>
/// <returns>Range the underlying client will adjust to.</returns>
/// <remarks>
/// Used by advanced features such as clientside encryption, which alters ranges to
/// ensure necessary info for decryption is downloaded.
/// </remarks>
public delegate HttpRange PredictEncryptedRangeAdjustment(HttpRange range);
/// <summary>
/// No-op for range adjustment.
/// </summary>
public static PredictEncryptedRangeAdjustment NoRangeAdjustment => range => range;
/// <summary>
/// The current position within the blob or file.
/// </summary>
private long _position;
/// <summary>
/// Last known length of underlying blob or file.
/// </summary>
private long _length;
/// <summary>
/// The number of bytes to download per call.
/// </summary>
private readonly int _bufferSize;
/// <summary>
/// The backing buffer.
/// </summary>
private byte[] _buffer;
/// <summary>
/// The current position within the buffer.
/// </summary>
private int _bufferPosition;
/// <summary>
/// The current length of the buffer that is populated.
/// </summary>
private int _bufferLength;
/// <summary>
/// If we are allowing the blob to be modifed while we read it.
/// </summary>
private readonly bool _allowBlobModifications;
/// <summary>
/// Indicated the user has called Seek() since the last Read() call, and the new position is outside _buffer.
/// </summary>
private bool _bufferInvalidated;
/// <summary>
/// Download() function.
/// </summary>
private readonly DownloadInternalAsync _downloadInternalFunc;
/// <summary>
/// Function to get properties.
/// </summary>
private readonly GetPropertiesAsync _getPropertiesInternalFunc;
/// <summary>
/// Hashing options to use with <see cref="_downloadInternalFunc"/>.
/// </summary>
private readonly DownloadTransferValidationOptions _validationOptions;
/// <summary>
/// Helper to determine how <see cref="_downloadInternalFunc"/> will adjust the range this class.
/// requests.
/// </summary>
private readonly PredictEncryptedRangeAdjustment _predictEncryptedRangeAdjustment;
public LazyLoadingReadOnlyStream(
DownloadInternalAsync downloadInternalFunc,
GetPropertiesAsync getPropertiesFunc,
DownloadTransferValidationOptions transferValidation,
bool allowModifications,
long initialLength,
long position = 0,
int? bufferSize = default,
PredictEncryptedRangeAdjustment rangePredictionFunc = default)
{
_downloadInternalFunc = downloadInternalFunc;
_getPropertiesInternalFunc = getPropertiesFunc;
_predictEncryptedRangeAdjustment = rangePredictionFunc ?? (range => range);
_position = position;
// If the blob cannot be modified and the total blob size is less than the default streaming size,
// the buffer size should be limited to the total blob size.
int maxBufferSize = allowModifications ? Constants.DefaultStreamingDownloadSize : (int)Math.Min(initialLength, Constants.DefaultStreamingDownloadSize);
_bufferSize = bufferSize ?? maxBufferSize;
_buffer = ArrayPool<byte>.Shared.Rent(_bufferSize);
_allowBlobModifications = allowModifications;
_bufferPosition = 0;
_bufferLength = 0;
_length = initialLength;
_bufferInvalidated = false;
// the caller to this stream cannot defer validation, as they cannot access a returned hash
if (!(transferValidation?.AutoValidateChecksum ?? true))
{
throw Errors.CannotDeferTransactionalHashVerification();
}
// we defer hash validation on download calls to validate in-place with our existing buffer
_validationOptions = transferValidation == default
? default
: new DownloadTransferValidationOptions
{
ChecksumAlgorithm = transferValidation.ChecksumAlgorithm,
AutoValidateChecksum = false
};
}
public override int Read(byte[] buffer, int offset, int count)
=> ReadInternal(
buffer,
offset,
count,
async: false,
default)
.EnsureCompleted();
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> await ReadInternal(
buffer,
offset,
count,
async: true,
cancellationToken)
.ConfigureAwait(false);
public async Task<int> ReadInternal(byte[] buffer, int offset, int count, bool async, CancellationToken cancellationToken)
{
ValidateReadParameters(buffer, offset, count);
if (_position == _length)
{
if (_allowBlobModifications)
{
// In case the blob grow since our last download call.
_length = await GetBlobLengthInternal(async, cancellationToken).ConfigureAwait(false);
if (_position == _length)
{
return 0;
}
}
else
{
return 0;
}
}
if (_bufferPosition == _bufferLength || _bufferInvalidated)
{
int lastDownloadedBytes = await DownloadInternal(async, cancellationToken).ConfigureAwait(false);
if (lastDownloadedBytes == 0)
{
return 0;
}
_bufferInvalidated = false;
}
int remainingBytesInBuffer = _bufferLength - _bufferPosition;
// We will return the minimum of remainingBytesInBuffer and the count provided by the user
int bytesToWrite = Math.Min(remainingBytesInBuffer, count);
Array.Copy(_buffer, _bufferPosition, buffer, offset, bytesToWrite);
_position += bytesToWrite;
_bufferPosition += bytesToWrite;
return bytesToWrite;
}
private async Task<int> DownloadInternal(bool async, CancellationToken cancellationToken)
{
Response<IDownloadedContent> response;
HttpRange range = new HttpRange(_position, _bufferSize);
// if _downloadInternalFunc is going to produce a range out of bounds response, we're at the end of the blob
if (_predictEncryptedRangeAdjustment(range).Offset >= _length)
{
return 0;
}
response = await _downloadInternalFunc(range, _validationOptions, async, cancellationToken).ConfigureAwait(false);
using Stream networkStream = response.Value.Content;
// The number of bytes we just downloaded.
long downloadSize = GetResponseRange(response.GetRawResponse()).Length.Value;
// The number of bytes we copied in the last loop.
int copiedBytes;
// Bytes we have copied so far.
int totalCopiedBytes = 0;
// Bytes remaining to copy. It is save to truncate the long because we asked for a max of int _buffer size bytes.
int remainingBytes = (int)downloadSize;
do
{
if (async)
{
copiedBytes = await networkStream.ReadAsync(
buffer: _buffer,
offset: totalCopiedBytes,
count: remainingBytes,
cancellationToken: cancellationToken).ConfigureAwait(false);
}
else
{
copiedBytes = networkStream.Read(
buffer: _buffer,
offset: totalCopiedBytes,
count: remainingBytes);
}
totalCopiedBytes += copiedBytes;
remainingBytes -= copiedBytes;
}
while (copiedBytes != 0);
_bufferPosition = 0;
_bufferLength = totalCopiedBytes;
_length = GetBlobLengthFromResponse(response.GetRawResponse());
// if we deferred transactional hash validation on download, validate now
// currently we always defer but that may change
if (_validationOptions != default && _validationOptions.ChecksumAlgorithm != StorageChecksumAlgorithm.None && !_validationOptions.AutoValidateChecksum)
{
ContentHasher.AssertResponseHashMatch(_buffer, _bufferPosition, _bufferLength, _validationOptions.ChecksumAlgorithm, response.GetRawResponse());
}
return totalCopiedBytes;
}
private static void ValidateReadParameters(byte[] buffer, int offset, int count)
{
if (buffer == null)
{
throw new ArgumentNullException($"{nameof(buffer)}", $"{nameof(buffer)} cannot be null.");
}
if (offset < 0)
{
throw new ArgumentOutOfRangeException(nameof(offset), $"{nameof(offset)} cannot be less than 0.");
}
if (offset > buffer.Length)
{
throw new ArgumentOutOfRangeException(nameof(offset), $"{nameof(offset)} cannot exceed {nameof(buffer)} length.");
}
if (count < 0)
{
throw new ArgumentOutOfRangeException(nameof(count), $"{nameof(count)} cannot be less than 0.");
}
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
// Return the buffer to the pool if we're called from Dispose or a finalizer
if (_buffer != null)
{
ArrayPool<byte>.Shared.Return(_buffer, clearArray: true);
_buffer = null;
}
}
private async Task<long> GetBlobLengthInternal(bool async, CancellationToken cancellationToken)
{
Response<TProperties> response = await _getPropertiesInternalFunc(async, cancellationToken).ConfigureAwait(false);
response.GetRawResponse().Headers.TryGetValue("Content-Length", out string lengthString);
if (lengthString == null)
{
throw new ArgumentException($"{HttpHeader.Names.ContentLength} header is missing on get properties response.");
}
return Convert.ToInt64(lengthString, CultureInfo.InvariantCulture);
}
private static long GetBlobLengthFromResponse(Response response)
{
response.Headers.TryGetValue("Content-Range", out string lengthString);
if (lengthString == null)
{
throw new ArgumentException("Content-Range header is missing on download response.");
}
string[] split = lengthString.Split('/');
return Convert.ToInt64(split[1], CultureInfo.InvariantCulture);
}
private static HttpRange GetResponseRange(Response response)
{
response.Headers.TryGetValue("Content-Range", out string rangeString);
if (rangeString == null)
{
throw new InvalidOperationException("Content-Range header is missing on download response.");
}
string[] split = rangeString.Split('/');
string[] rangeSplit = split[0].Split('-');
string[] firstbyteSplit = rangeSplit[0].Split(' ');
long firstByte = Convert.ToInt64(firstbyteSplit[1], CultureInfo.InvariantCulture);
long lastByte = Convert.ToInt64(rangeSplit[1], CultureInfo.InvariantCulture);
return new HttpRange(firstByte, lastByte - firstByte + 1);
}
public override bool CanRead => true;
public override bool CanSeek => true;
public override bool CanWrite => false;
public override long Length => _length;
public override long Position
{
get => _position;
set => Seek(value, SeekOrigin.Begin);
}
public override long Seek(long offset, SeekOrigin origin)
{
long newPosition = CalculateNewPosition(offset, origin);
if (newPosition == _position)
{
return _position;
}
// newPosition < 0
if (newPosition < 0)
{
throw new ArgumentException($"New {nameof(offset)} cannot be less than 0. Value was {newPosition}", nameof(offset));
}
// newPosition > _length
if (newPosition > _length)
{
throw new ArgumentException("You cannot seek past the last known length of the underlying blob or file.", nameof(offset));
}
// newPosition is less than _position, but within _buffer.
long beginningOfBuffer = _position - _bufferPosition;
if (newPosition < _position && newPosition >= beginningOfBuffer)
{
_bufferPosition = (int)(newPosition - beginningOfBuffer);
_position = newPosition;
return newPosition;
}
// newPosition is greater than _position, but within _buffer.
long endOfBuffer = _position + (_bufferLength - _bufferPosition);
if (newPosition > _position && newPosition < endOfBuffer)
{
_bufferPosition = (int)(newPosition - beginningOfBuffer);
_position = newPosition;
return newPosition;
}
// newPosition is outside of _buffer, we will need to re-download.
_bufferInvalidated = true;
_position = newPosition;
return newPosition;
}
internal long CalculateNewPosition(long offset, SeekOrigin origin)
{
switch (origin)
{
case SeekOrigin.Begin:
return offset;
case SeekOrigin.Current:
return _position + offset;
case SeekOrigin.End:
if (_allowBlobModifications)
{
throw new ArgumentException($"Cannot {nameof(Seek)} with {nameof(SeekOrigin)}.{nameof(SeekOrigin.End)} on a growing blob or file. Call Stream.Seek(Stream.Length, SeekOrigin.Begin) to get to the end of known data.", nameof(origin));
}
else
{
return _length + offset;
}
default:
throw new ArgumentException($"Unknown ${nameof(SeekOrigin)} value", nameof(origin));
}
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override void Flush() { }
public override Task FlushAsync(CancellationToken cancellationToken)
=> Task.CompletedTask;
}
}