Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement download and upload operations in TES scheduler via the TES Task runner #232

Merged
merged 23 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
c63a976
Place published Tes.Runner CLI into TesApi.Web's scripts folder durin…
BMurri May 24, 2023
78903e3
Fix dropping published binary in non-published builds
BMurri May 24, 2023
b105e04
Upload node task runner to storage if its MD5 does not match
BMurri May 24, 2023
8d11657
Added upload of node task runner and ability to skip missing source f…
BMurri May 25, 2023
79c9326
Added ability to record upload/download metrics
BMurri May 26, 2023
8b7634c
Move non-temporary arguments to NodeTask
BMurri May 26, 2023
7faf14c
Batch node script changed to call node task runner to perform
BMurri May 26, 2023
24a81ed
cleanup
BMurri May 26, 2023
aedaa73
Mitigate CodeQL build failure
BMurri May 26, 2023
93bd8a8
fix path
BMurri May 27, 2023
8ca3ad2
Request write SAS in order to write/possibly update runner binary in …
BMurri May 27, 2023
e9ca47f
Move skipping files to NodeTask's Outputs and fix node runner blob path
BMurri May 30, 2023
b04ad53
Fix node script and improve ContainerRegistryProvider.IsImagePublic()
BMurri May 30, 2023
1eb7500
Cleaup TesApi.Web project file
BMurri May 31, 2023
c94410c
Remove memory hints (for now)
BMurri May 31, 2023
d8fe560
Prevent delay of starting API while uploading node runner to storage
BMurri May 31, 2023
05ed99e
Attempt to make checking/storing the node task runner binary more rubust
BMurri May 31, 2023
6ab91ea
Fix tRunner not dropping into final publish position
BMurri Jun 1, 2023
507a9ad
Incorporate PR feedback
BMurri Jun 1, 2023
7446e89
Merge branch 'main' into bmurri/tes-runner-up-down-load
BMurri Jun 1, 2023
ce5a113
Move changes to separate PR
BMurri Jun 1, 2023
4a894aa
Process upload directories
BMurri Jun 3, 2023
304a967
Merge branch 'main' into bmurri/tes-runner-up-down-load
BMurri Jun 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ dotnet_style_prefer_conditional_expression_over_assignment = true:silent
dotnet_style_prefer_conditional_expression_over_return = true:silent

# Enforce license header
dotnet_diagnostic.IDE0073.severity = warning
file_header_template = Copyright (c) Microsoft Corporation.\nLicensed under the MIT License.
dotnet_diagnostic.IDE0073.severity = warning
file_header_template = Copyright (c) Microsoft Corporation.\nLicensed under the MIT License.


###############################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,25 @@ public class NodeTask
public List<string>? CommandsToExecute { get; set; }
public List<FileInput>? Inputs { get; set; }
public List<FileOutput>? Outputs { get; set; }
public string? MetricsFilename { get; set; }
public string? InputsMetricsFormat { get; set; }
public string? OutputsMetricsFormat { get; set; }
}

public class FileOutput
{
public string? FullFileName { get; set; }
public string? TargetUrl { get; set; }
public SasResolutionStrategy SasStrategy { get; set; }
public SasResolutionStrategy? SasStrategy { get; set; }
public FileType? FileType { get; set; }
public bool? Required { get; set; }
}

public class FileInput
{
public string? FullFileName { get; set; }
public string? SourceUrl { get; set; }
public SasResolutionStrategy SasStrategy { get; set; }
public SasResolutionStrategy? SasStrategy { get; set; }
}

[JsonConverter(typeof(JsonStringEnumConverter))]
Expand All @@ -36,4 +41,11 @@ public enum SasResolutionStrategy
TerraWsm,
SchemeConverter,
}

[JsonConverter(typeof(JsonStringEnumConverter))]
public enum FileType
{
File,
Directory,
}
}
2 changes: 1 addition & 1 deletion src/Tes.Runner.Test/BlobDownloaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class BlobDownloaderTest
private BlobContainerClient blobContainerClient;
private Guid containerId;
private BlobDownloader blobDownloader;
private readonly BlobPipelineOptions blobPipelineOptions = new BlobPipelineOptions();
private readonly BlobPipelineOptions blobPipelineOptions = new();
#pragma warning restore CS8618
[TestInitialize]
public async Task Init()
Expand Down
9 changes: 4 additions & 5 deletions src/Tes.Runner.Test/BlobPipelineTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ public async Task ExecuteAsync_TwoOperations_CallsReaderWriterAndCompleteMethods
AssertReaderWriterAndCompleteMethodsAreCalled(pipeline, expectedNumberOfCalls, 2);
}

private void AssertReaderWriterAndCompleteMethodsAreCalled(BlobOperationPipelineTestImpl operationPipeline, long numberOfWriterReaderCalls, int numberOfCompleteCalls)
private static void AssertReaderWriterAndCompleteMethodsAreCalled(BlobOperationPipelineTestImpl operationPipeline, long numberOfWriterReaderCalls, int numberOfCompleteCalls)
{
List<MethodCall> executeWriteInfo = operationPipeline.MethodCalls["ExecuteWriteAsync"];
var executeWriteInfo = operationPipeline.MethodCalls["ExecuteWriteAsync"];
Assert.IsNotNull(executeWriteInfo);
Assert.AreEqual(numberOfWriterReaderCalls, executeWriteInfo.Count);

Expand All @@ -97,12 +97,11 @@ private void AssertReaderWriterAndCompleteMethodsAreCalled(BlobOperationPipeline
/// </summary>
class BlobOperationPipelineTestImpl : BlobOperationPipeline
{
private readonly ConcurrentDictionary<string, List<MethodCall>> methodCalls =
new ConcurrentDictionary<string, List<MethodCall>>();
private readonly ConcurrentDictionary<string, List<MethodCall>> methodCalls = new();

private readonly long sourceLength;

private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1);
private readonly SemaphoreSlim semaphore = new(1);

public BlobOperationPipelineTestImpl(BlobPipelineOptions pipelineOptions, Channel<byte[]> memoryBuffer, long sourceLength) : base(pipelineOptions, memoryBuffer)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Tes.Runner.Test/BlobUploaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class BlobUploaderTests
private BlobContainerClient blobContainerClient;
private Guid containerId;
private BlobUploader blobUploader;
private readonly BlobPipelineOptions blobPipelineOptions = new BlobPipelineOptions();
private readonly BlobPipelineOptions blobPipelineOptions = new();
#pragma warning restore CS8618

[TestInitialize]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public void ToCommandArgs_SetsAllOptionsAsCliOptions()
Assert.AreEqual($"--bufferCapacity {BlobPipelineOptions.DefaultReadWriteBuffersCapacity}", args[4]);
Assert.AreEqual($"--apiVersion {BlobPipelineOptions.DefaultApiVersion}", args[5]);
Assert.AreEqual("--file file", args[6]);
Assert.AreEqual(7, args.Length);
}

[TestMethod]
Expand Down
28 changes: 14 additions & 14 deletions src/Tes.Runner.Test/PartReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ namespace Tes.Runner.Test
public class PartReaderTests
{
private const int MemBuffersCapacity = 10;
private PartsReader partsReader;
private Mock<IBlobPipeline> pipeline;
private Channel<byte[]> memoryBufferChannel;
private PartsReader? partsReader;
private Mock<IBlobPipeline>? pipeline;
private Channel<byte[]>? memoryBufferChannel;
private readonly int blockSizeBytes = BlobSizeUtils.DefaultBlockSizeBytes;
private BlobPipelineOptions options;
private Channel<PipelineBuffer> readBufferChannel;
private Channel<PipelineBuffer> writeBufferChannel;
private BlobPipelineOptions? options;
private Channel<PipelineBuffer>? readBufferChannel;
private Channel<PipelineBuffer>? writeBufferChannel;
private readonly long fileSize = BlobSizeUtils.MiB * 100;
private readonly string fileName = "tempFile";

Expand All @@ -38,18 +38,18 @@ public async Task StartPartsReaderAsync_CallsPipelineReadOperationExpectedNumber
{
var numberOfParts = await PrepareReaderChannelAsync();

await partsReader.StartPartsReaderAsync(readBufferChannel, writeBufferChannel);
await partsReader!.StartPartsReaderAsync(readBufferChannel!, writeBufferChannel!);

pipeline.Verify(p => p.ExecuteReadAsync(It.IsAny<PipelineBuffer>()), Times.Exactly(numberOfParts));
Assert.AreEqual(numberOfParts, writeBufferChannel.Reader.Count);
pipeline!.Verify(p => p.ExecuteReadAsync(It.IsAny<PipelineBuffer>()), Times.Exactly(numberOfParts));
Assert.AreEqual(numberOfParts, writeBufferChannel!.Reader.Count);
}

[TestMethod]
public async Task StartPartsReaderAsync_ThrowsWhenOneCallFailsFromTheList()
{
await PrepareReaderChannelAsync();
var calls = 0;
pipeline.Setup(p => p.ExecuteReadAsync(It.IsAny<PipelineBuffer>()))
pipeline!.Setup(p => p.ExecuteReadAsync(It.IsAny<PipelineBuffer>()))
.Callback(() =>
{
calls++;
Expand All @@ -63,26 +63,26 @@ public async Task StartPartsReaderAsync_ThrowsWhenOneCallFailsFromTheList()
}
});

await Assert.ThrowsExceptionAsync<InvalidOperationException>(() => partsReader.StartPartsReaderAsync(readBufferChannel, writeBufferChannel));
await Assert.ThrowsExceptionAsync<InvalidOperationException>(() => partsReader!.StartPartsReaderAsync(readBufferChannel!, writeBufferChannel!));
}

[TestMethod]
public async Task StartPartsReaderAsync_MemoryBuffersAreUsed()
{
var numberOfParts = await PrepareReaderChannelAsync();

await partsReader.StartPartsReaderAsync(readBufferChannel, writeBufferChannel);
await partsReader!.StartPartsReaderAsync(readBufferChannel!, writeBufferChannel!);

//The reader reads from the memory buffer to create parts, the number of items in the memory
//buffer must be the available must be the difference between the number of memory buffers and the number of parts to create
Assert.AreEqual(MemBuffersCapacity - numberOfParts, memoryBufferChannel.Reader.Count);
Assert.AreEqual(MemBuffersCapacity - numberOfParts, memoryBufferChannel!.Reader.Count);
}

private async Task<int> PrepareReaderChannelAsync()
{
var buffer = new PipelineBuffer();
var numberOfParts = (int)(fileSize / blockSizeBytes);
await RunnerTestUtils.AddPipelineBuffersAndCompleteChannelAsync(readBufferChannel, numberOfParts,
await RunnerTestUtils.AddPipelineBuffersAndCompleteChannelAsync(readBufferChannel!, numberOfParts,
new Uri("https://foo.bar/cont/blob"), blockSizeBytes, fileSize, fileName);
return numberOfParts;
}
Expand Down
13 changes: 8 additions & 5 deletions src/Tes.Runner.Test/PartsProducerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ public async Task StartPartsProducersAsync_ProducesTheExpectedNumberOfParts(int
pipeline.Setup(p => p.GetSourceLengthAsync(It.IsAny<string>())).ReturnsAsync(fileSize);

var blobOp = new BlobOperationInfo(new Uri("https://foo.bar/con/blob"), "blob", "blob", false);
var opsList = new List<BlobOperationInfo>() { blobOp };

await partsProducer.StartPartsProducersAsync(new List<BlobOperationInfo>() { blobOp }, readBuffer);
await partsProducer.StartPartsProducersAsync(opsList, readBuffer);

readBuffer.Writer.Complete();

Expand All @@ -62,16 +63,17 @@ public async Task StartPartsProducersAsync_PartsAreProperSize(int blockSize, lon
pipeline.Setup(p => p.GetSourceLengthAsync(It.IsAny<string>())).ReturnsAsync(fileSize);

var blobOp = new BlobOperationInfo(new Uri("https://foo.bar/con/blob"), "blob", "blob", false);
var opsList = new List<BlobOperationInfo>() { blobOp };

await partsProducer.StartPartsProducersAsync(new List<BlobOperationInfo>() { blobOp }, readBuffer);
await partsProducer.StartPartsProducersAsync(opsList, readBuffer);

readBuffer.Writer.Complete();

var parts = await RunnerTestUtils.ReadAllPipelineBuffersAsync(readBuffer.Reader.ReadAllAsync());

Assert.AreEqual(expectedPartSize.Length, parts.Count);

for (int i = 0; i < parts.Count; i++)
for (var i = 0; i < parts.Count; i++)
{
Assert.AreEqual(expectedPartSize[i], parts[i].Length);
}
Expand All @@ -86,8 +88,9 @@ public async Task StartPartsProducersAsync_PartsHaveExpectedLengths(int blockSiz
pipeline.Setup(p => p.GetSourceLengthAsync(It.IsAny<string>())).ReturnsAsync(fileSize);

var blobOp = new BlobOperationInfo(new Uri("https://foo.bar/con/blob"), "blob", "blob", false);
var opsList = new List<BlobOperationInfo>() { blobOp };

await partsProducer.StartPartsProducersAsync(new List<BlobOperationInfo>() { blobOp }, readBuffer);
await partsProducer.StartPartsProducersAsync(opsList, readBuffer);

readBuffer.Writer.Complete();

Expand All @@ -99,7 +102,7 @@ public async Task StartPartsProducersAsync_PartsHaveExpectedLengths(int blockSiz

var expectedOffset = 0;

for (int i = 0; i < parts.Count; i++)
for (var i = 0; i < parts.Count; i++)
{
var part = parts[i];
Assert.AreEqual(expectedOffset, part.Offset);
Expand Down
26 changes: 13 additions & 13 deletions src/Tes.Runner.Test/PartsWriterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ namespace Tes.Runner.Test
public class PartsWriterTests
{
private const int MemBuffersCapacity = 10;
private PartsWriter partsWriter;
private Mock<IBlobPipeline> pipeline;
private Channel<byte[]> memoryBufferChannel;
private PartsWriter? partsWriter;
private Mock<IBlobPipeline>? pipeline;
private Channel<byte[]>? memoryBufferChannel;
private readonly int blockSizeBytes = BlobSizeUtils.DefaultBlockSizeBytes;
private BlobPipelineOptions options;
private Channel<ProcessedBuffer> processedBufferChannel;
private Channel<PipelineBuffer> writeBufferChannel;
private BlobPipelineOptions? options;
private Channel<ProcessedBuffer>? processedBufferChannel;
private Channel<PipelineBuffer>? writeBufferChannel;
private readonly long fileSize = BlobSizeUtils.MiB * 100;
private readonly string fileName = "tempFile";

[TestInitialize]
public async Task SetUp()
public void SetUp()
{
//the memory pool must be empty as the writer will write to it
memoryBufferChannel = Channel.CreateBounded<byte[]>(MemBuffersCapacity);
Expand All @@ -39,28 +39,28 @@ public async Task StartPartsWriterAsync_CallsPipelineWriteOperationExpectedNumbe
{
var numberOfParts = await PrepareWriterChannelAsync();

await partsWriter.StartPartsWritersAsync(writeBufferChannel, processedBufferChannel);
await partsWriter!.StartPartsWritersAsync(writeBufferChannel!, processedBufferChannel!);

pipeline.Verify(p => p.ExecuteWriteAsync(It.IsAny<PipelineBuffer>()), Times.Exactly(numberOfParts));
Assert.AreEqual(numberOfParts, processedBufferChannel.Reader.Count);
pipeline!.Verify(p => p.ExecuteWriteAsync(It.IsAny<PipelineBuffer>()), Times.Exactly(numberOfParts));
Assert.AreEqual(numberOfParts, processedBufferChannel!.Reader.Count);
}
[TestMethod]
public async Task StartPartsReaderAsync_MemoryBuffersAreReturned()
{
var numberOfParts = await PrepareWriterChannelAsync();

await partsWriter.StartPartsWritersAsync(writeBufferChannel, processedBufferChannel);
await partsWriter!.StartPartsWritersAsync(writeBufferChannel!, processedBufferChannel!);

//The writers write to the memory channel/pool after processing, the number of items in the memory
//buffer must the number of parts to that read from the writer's channel.
Assert.AreEqual(numberOfParts, memoryBufferChannel.Reader.Count);
Assert.AreEqual(numberOfParts, memoryBufferChannel!.Reader.Count);
}

private async Task<int> PrepareWriterChannelAsync()
{
var buffer = new PipelineBuffer();
var numberOfParts = (int)(fileSize / blockSizeBytes);
await RunnerTestUtils.AddPipelineBuffersAndCompleteChannelAsync(writeBufferChannel, numberOfParts,
await RunnerTestUtils.AddPipelineBuffersAndCompleteChannelAsync(writeBufferChannel!, numberOfParts,
new Uri("https://foo.bar/cont/blob"), blockSizeBytes, fileSize, fileName);
return numberOfParts;
}
Expand Down
22 changes: 11 additions & 11 deletions src/Tes.Runner.Test/ProcessedPartsProcessorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ namespace Tes.Runner.Test
[TestCategory("Unit")]
public class ProcessedPartsProcessorTests
{
private ProcessedPartsProcessor processedPartsProcessor;
private Mock<IBlobPipeline> pipeline;
private Channel<ProcessedBuffer> processedBuffer;
private Channel<PipelineBuffer> readBuffer;
private ProcessedPartsProcessor? processedPartsProcessor;
private Mock<IBlobPipeline>? pipeline;
private Channel<ProcessedBuffer>? processedBuffer;
private Channel<PipelineBuffer>? readBuffer;

[TestInitialize]
public void SetUp()
Expand All @@ -31,21 +31,21 @@ public void SetUp()
[DataRow(1, 0, 1)]
public async Task StartProcessedPartsProcessorAsync_CallsOnCompleteOnceForEachFileAndProccessedBufferIsEmpty(int expectedNumberOfFiles, long fileSize, int numberOfPartsPerFile)
{
processedPartsProcessor = new ProcessedPartsProcessor(pipeline.Object);
processedPartsProcessor = new ProcessedPartsProcessor(pipeline!.Object);

for (int f = 0; f < expectedNumberOfFiles; f++)
for (var f = 0; f < expectedNumberOfFiles; f++)
{
await RunnerTestUtils.AddProcessedBufferAsync(processedBuffer, $"file{f}", numberOfPartsPerFile,
await RunnerTestUtils.AddProcessedBufferAsync(processedBuffer!, $"file{f}", numberOfPartsPerFile,
fileSize);
};

await processedPartsProcessor.StartProcessedPartsProcessorAsync(expectedNumberOfFiles, processedBuffer, readBuffer);
await processedPartsProcessor!.StartProcessedPartsProcessorAsync(expectedNumberOfFiles, processedBuffer!, readBuffer!);

processedBuffer.Writer.Complete();
processedBuffer!.Writer.Complete();

pipeline.Verify(p => p.OnCompletionAsync(It.IsAny<long>(), It.IsAny<Uri?>(), It.IsAny<string>(), It.IsAny<string>()), Times.Exactly(expectedNumberOfFiles));
pipeline!.Verify(p => p.OnCompletionAsync(It.IsAny<long>(), It.IsAny<Uri?>(), It.IsAny<string>(), It.IsAny<string>()), Times.Exactly(expectedNumberOfFiles));

var parts = await RunnerTestUtils.ReadAllPipelineBuffersAsync(processedBuffer.Reader.ReadAllAsync());
var parts = await RunnerTestUtils.ReadAllPipelineBuffersAsync(processedBuffer!.Reader.ReadAllAsync());

Assert.IsNotNull(parts);
Assert.AreEqual(0, parts.Count);
Expand Down
6 changes: 3 additions & 3 deletions src/Tes.Runner.Test/ResolutionPolicyHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public async Task ApplyResolutionPolicyAsync_WhenTestTaskOutputsIsNotEmpty_Retur
{
var testTaskOutputs = new List<FileOutput>
{
new FileOutput(){FullFileName = "file", TargetUrl = "http://foo.bar", SasStrategy = SasResolutionStrategy.None},
new FileOutput(){FullFileName = "file1", TargetUrl = "http://foo1.bar", SasStrategy = SasResolutionStrategy.None},
new FileOutput(){FullFileName = "file2", TargetUrl = "http://foo2.bar", SasStrategy = SasResolutionStrategy.None}
new FileOutput(){FullFileName = "file", TargetUrl = "http://foo.bar", SasStrategy = SasResolutionStrategy.None, Required = true},
new FileOutput(){FullFileName = "file1", TargetUrl = "http://foo1.bar", SasStrategy = SasResolutionStrategy.None, Required = true},
new FileOutput(){FullFileName = "file2", TargetUrl = "http://foo2.bar", SasStrategy = SasResolutionStrategy.None, Required = true}
};
var result = await resolutionPolicyHandler.ApplyResolutionPolicyAsync(testTaskOutputs);
Assert.IsNotNull(result);
Expand Down
4 changes: 2 additions & 2 deletions src/Tes.Runner.Test/RunnerTestUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public static string CalculateMd5(string file)
var hash = md5.ComputeHash(stream);
return Convert.ToBase64String(hash);
}

public static void DeleteFileIfExists(string file)
{
if (File.Exists(file))
Expand All @@ -45,7 +46,6 @@ public static async Task<List<T>> ReadAllPipelineBuffersAsync<T>(IAsyncEnumerabl
return pipelineBuffers;
}


public static string AddRandomDataAndReturnMd5(byte[] data)
{
Random.NextBytes(data);
Expand Down Expand Up @@ -130,7 +130,7 @@ public static List<PipelineBuffer> CreatePipelineBuffers(int numberOfParts, Uri

public static async Task AddProcessedBufferAsync(Channel<ProcessedBuffer> processedBuffer, string fileName, int numberOfParts, long fileSize)
{
for (int i = 0; i < numberOfParts; i++)
for (var i = 0; i < numberOfParts; i++)
{
var processedPart = new ProcessedBuffer(fileName, null, fileSize, i, numberOfParts, Channel.CreateUnbounded<FileStream>(), null, 0, null);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Tes.Runner.Storage;

namespace Tes.Runner.Storage.Tests
{
[TestClass()]
Expand Down
Loading