-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathBlobPipelineTests.cs
177 lines (142 loc) · 6.79 KB
/
BlobPipelineTests.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using Tes.Runner.Transfer;
namespace Tes.Runner.Test
{
[TestClass]
[TestCategory("Unit")]
public class BlobPipelineTests
{
#pragma warning disable CS8618
private BlobOperationPipelineTestImpl operationPipeline;
private BlobPipelineOptions options;
private readonly int blockSize = BlobSizeUtils.MiB;
private readonly long sourceSize = BlobSizeUtils.MiB * 10;
private string tempFile1;
private string tempFile2;
private Channel<byte[]> memoryBuffer;
#pragma warning restore CS8618
[TestInitialize]
public async Task SetUp()
{
tempFile1 = await RunnerTestUtils.CreateTempFileAsync();
tempFile2 = await RunnerTestUtils.CreateTempFileAsync();
memoryBuffer = await MemoryBufferPoolFactory.CreateMemoryBufferPoolAsync(5, blockSize);
options = new BlobPipelineOptions(blockSize, 10, 10, 10);
operationPipeline = new BlobOperationPipelineTestImpl(options, memoryBuffer, sourceSize);
}
[TestCleanup]
public void CleanUp()
{
RunnerTestUtils.DeleteFileIfExists(tempFile1);
RunnerTestUtils.DeleteFileIfExists(tempFile2);
}
[TestMethod]
public async Task ExecuteAsync_SingleOperation_CallsReaderWriterAndCompleteMethods_CorrectNumberOfTimes()
{
var blobOp = new BlobOperationInfo(new Uri("https://foo.bar/con/blob"), tempFile1, tempFile1, true);
await operationPipeline.ExecuteAsync(new List<BlobOperationInfo>() { blobOp });
//the number of calls should be size of the file divided by the number blocks
var expectedNumberOfCalls = (sourceSize / blockSize);
AssertReaderWriterAndCompleteMethodsAreCalled(operationPipeline, expectedNumberOfCalls, 1);
}
[TestMethod]
public async Task ExecuteAsync_TwoOperations_CallsReaderWriterAndCompleteMethods_CorrectNumberOfTimes()
{
var pipeline = new BlobOperationPipelineTestImpl(options, memoryBuffer, sourceSize);
var blobOps = new List<BlobOperationInfo>()
{
new BlobOperationInfo(new Uri("https://foo.bar/con/blob1"), tempFile1, tempFile1, true),
new BlobOperationInfo(new Uri("https://foo.bar/con/blob2"), tempFile2, tempFile2, true)
};
await pipeline.ExecuteAsync(blobOps);
//the number of calls should be size of the file divided by the number blocks, times the number of files
var expectedNumberOfCalls = (sourceSize / blockSize) * blobOps.Count;
AssertReaderWriterAndCompleteMethodsAreCalled(pipeline, expectedNumberOfCalls, 2);
}
private static void AssertReaderWriterAndCompleteMethodsAreCalled(BlobOperationPipelineTestImpl operationPipeline, long numberOfWriterReaderCalls, int numberOfCompleteCalls)
{
var executeWriteInfo = operationPipeline.MethodCalls["ExecuteWriteAsync"];
Assert.IsNotNull(executeWriteInfo);
Assert.AreEqual(numberOfWriterReaderCalls, executeWriteInfo.Count);
var executeReadInfo = operationPipeline.MethodCalls["ExecuteReadAsync"];
Assert.IsNotNull(executeReadInfo);
Assert.AreEqual(numberOfWriterReaderCalls, executeWriteInfo.Count);
var onCompletionInfo = operationPipeline.MethodCalls["OnCompletionAsync"];
Assert.IsNotNull(onCompletionInfo);
//complete must always be one
Assert.AreEqual(numberOfCompleteCalls, onCompletionInfo.Count);
}
}
/// <summary>
/// This is a test implementation of BlobPipeline.
/// Since there is no way to mock the base class, we have to create a test implementation and capture the execution of methods directly.
/// </summary>
class BlobOperationPipelineTestImpl : BlobOperationPipeline
{
private readonly ConcurrentDictionary<string, List<MethodCall>> methodCalls = new();
private readonly long sourceLength;
private readonly SemaphoreSlim semaphore = new(1);
public BlobOperationPipelineTestImpl(BlobPipelineOptions pipelineOptions, Channel<byte[]> memoryBuffer, long sourceLength) : base(pipelineOptions, memoryBuffer)
{
this.sourceLength = sourceLength;
}
public ConcurrentDictionary<string, List<MethodCall>> MethodCalls => methodCalls;
public override ValueTask<int> ExecuteWriteAsync(PipelineBuffer buffer)
{
AddMethodCall(nameof(ExecuteWriteAsync), buffer);
return ValueTask.FromResult(buffer.Length);
}
public override ValueTask<int> ExecuteReadAsync(PipelineBuffer buffer)
{
AddMethodCall(nameof(ExecuteReadAsync), buffer);
return ValueTask.FromResult(buffer.Length);
}
public override Task<long> GetSourceLengthAsync(string source)
{
AddMethodCall(nameof(GetSourceLengthAsync), source);
return Task.FromResult(sourceLength);
}
public override Task OnCompletionAsync(long length, Uri? blobUrl, string fileName, string? rootHash)
{
Debug.Assert(blobUrl != null, nameof(blobUrl) + " != null");
AddMethodCall(nameof(OnCompletionAsync), length, blobUrl, fileName, rootHash!);
return Task.CompletedTask;
}
public override void ConfigurePipelineBuffer(PipelineBuffer buffer)
{
AddMethodCall(nameof(ConfigurePipelineBuffer), buffer);
}
public async Task<long> ExecuteAsync(List<BlobOperationInfo> blobOperations)
{
var data = await ExecutePipelineAsync(blobOperations);
return data;
}
private void AddMethodCall(string methodName, params Object[] args)
{
//the add/update factories are not thread safe, hence the semaphore here...
semaphore.Wait();
try
{
Logger.LogInformation($"Adding method call {methodName} with args {args}");
methodCalls.AddOrUpdate(methodName,
(key) => new List<MethodCall>() { new MethodCall(key, 1, args.ToList()) },
(key, value) =>
{
value.Add(new MethodCall(methodName, value.Count + 1,
args.ToList()));
return value;
});
}
finally
{
semaphore.Release();
}
}
}
record MethodCall(string MethodName, int InvocationTime, List<object> Parameters);
}