Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TES node runner project #211

Merged
merged 28 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d78f9d7
delete batch pool via WSM
giventocode Mar 24, 2023
3ada457
log message formatting
giventocode Mar 25, 2023
9ff10c9
initial scaffolding and code migration
giventocode Mar 25, 2023
bf7cbf9
pipeline refactoring
giventocode Mar 27, 2023
724c765
initial commit
giventocode Apr 25, 2023
539eec5
test and refactoring
giventocode Apr 26, 2023
cc50dd6
Merge branch 'ja-tes-runner' of https://github.com/microsoft/ga4gh-te…
giventocode Apr 26, 2023
3bc9d67
refactoring blobpipeline operations
giventocode Apr 27, 2023
ef8b36e
fix blob upload test
giventocode Apr 27, 2023
ce6afe3
dotnet format and part reader test
giventocode Apr 27, 2023
7884dba
parts processor refactoring, tests and documentation
giventocode Apr 27, 2023
64e2321
refactoring and tests
giventocode Apr 27, 2023
d6cf442
cli refactoring
giventocode Apr 28, 2023
9d5523e
retry policy for http ops
giventocode Apr 30, 2023
dfec1c4
docs, integration testsand refactoring
giventocode May 1, 2023
ed654be
readme move
giventocode May 1, 2023
431194f
CLI refactoring
giventocode May 2, 2023
22a610a
dotnet format
giventocode May 2, 2023
96df314
Ignore upload integration test
giventocode May 2, 2023
0da1ce4
added validation for buffer capacity
giventocode May 2, 2023
ae1a9e5
CLI refactoring, and json TES task
giventocode May 3, 2023
1d9d7a2
dotnet format
giventocode May 3, 2023
ac1a3c3
Cloud provider scheme converter
giventocode May 3, 2023
6ea5b4a
dotnet format
giventocode May 3, 2023
cd9c425
fix command binding not working.
giventocode May 3, 2023
5fe79cc
Minor formatting changes (#213)
MattMcL4475 May 4, 2023
d2ee55c
feedback merge, and formatting
giventocode May 4, 2023
104f9a2
minor renaming.
giventocode May 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Microsoft.GA4GH.TES.sln
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CommonUtilities.Tests", "sr
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GenerateBatchVmSkus", "src\GenerateBatchVmSkus\GenerateBatchVmSkus.csproj", "{23C29BA5-3A52-4356-9220-FC84FE32E43E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tes.Runner", "Tes.Runner\Tes.Runner.csproj", "{8EFC0774-6B7A-480D-9F02-6350A05B8B84}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tes.RunnerCLI", "Tes.RunnerCLI\Tes.RunnerCLI.csproj", "{E81DF1B1-776E-4FDD-BEE0-0159F48EF321}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tes.Runner.Test", "Tes.Runner.Test\Tes.Runner.Test.csproj", "{FEB786B8-CE2C-4107-A04A-53D2F74839E5}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -67,6 +73,18 @@ Global
{23C29BA5-3A52-4356-9220-FC84FE32E43E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{23C29BA5-3A52-4356-9220-FC84FE32E43E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{23C29BA5-3A52-4356-9220-FC84FE32E43E}.Release|Any CPU.Build.0 = Release|Any CPU
{8EFC0774-6B7A-480D-9F02-6350A05B8B84}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8EFC0774-6B7A-480D-9F02-6350A05B8B84}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8EFC0774-6B7A-480D-9F02-6350A05B8B84}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8EFC0774-6B7A-480D-9F02-6350A05B8B84}.Release|Any CPU.Build.0 = Release|Any CPU
{E81DF1B1-776E-4FDD-BEE0-0159F48EF321}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E81DF1B1-776E-4FDD-BEE0-0159F48EF321}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E81DF1B1-776E-4FDD-BEE0-0159F48EF321}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E81DF1B1-776E-4FDD-BEE0-0159F48EF321}.Release|Any CPU.Build.0 = Release|Any CPU
{FEB786B8-CE2C-4107-A04A-53D2F74839E5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FEB786B8-CE2C-4107-A04A-53D2F74839E5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FEB786B8-CE2C-4107-A04A-53D2F74839E5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FEB786B8-CE2C-4107-A04A-53D2F74839E5}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
83 changes: 83 additions & 0 deletions Tes.Runner.Test/BlobDownloaderTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using Azure.Storage.Sas;
using Tes.Runner.Transfer;

namespace Tes.Runner.Test
{
[TestClass]
[TestCategory("Integration")]
[Ignore]
public class BlobDownloaderTest
{
private BlobContainerClient blobContainerClient;
private Guid containerId;
private BlobDownloader blobDownloader;
private readonly BlobPipelineOptions blobPipelineOptions = new BlobPipelineOptions();

[TestInitialize]
public async Task Init()
{
containerId = Guid.NewGuid();
var options = new BlobClientOptions(BlobClientOptions.ServiceVersion.V2020_12_06);

var blobService = new BlobServiceClient("UseDevelopmentStorage=true", options);

blobContainerClient = blobService.GetBlobContainerClient(containerId.ToString());

blobContainerClient.Create(PublicAccessType.None);

blobDownloader = new BlobDownloader(blobPipelineOptions,
await MemoryBufferPoolFactory.CreateMemoryBufferPoolAsync(10, blobPipelineOptions.BlockSizeBytes));
}

[TestCleanup]
public void Cleanup()
{
blobContainerClient.DeleteIfExists();
}

[DataTestMethod]
[DataRow(10, 0)]
[DataRow(10, 100)]
[DataRow(100, 0)]
[DataRow(99, 1)]
[DataRow(100, 1)]
[DataRow(0, 0)]
public async Task DownloadAsync_DownloadsFilesAndChecksumMatches(int numberOfMiB, int extraBytes)
{
var sourceFilename = await RunnerTestUtils.CreateTempFileWithContentAsync(numberOfMiB, extraBytes);
var blobClient = blobContainerClient.GetBlobClient(sourceFilename);

// Uploads a file.
await using var fileToUpload = File.OpenRead(sourceFilename);
await blobClient.UploadAsync(fileToUpload);

var url = CreateSasUrl(blobClient, sourceFilename);

var downloadFilename = sourceFilename + "_down";

await blobDownloader.DownloadAsync(new List<DownloadInfo>() { new DownloadInfo(downloadFilename, url) });

Assert.AreEqual(RunnerTestUtils.CalculateMd5(sourceFilename),
RunnerTestUtils.CalculateMd5(downloadFilename));
}

private Uri CreateSasUrl(BlobClient blobClient, string file)
{
var sasBuilder = new BlobSasBuilder(BlobContainerSasPermissions.All, DateTimeOffset.UtcNow.AddHours(1))
{
BlobContainerName = blobClient.GetParentBlobContainerClient().Name,
BlobName = file,
Resource = "b"
};

var url = blobContainerClient.GetBlobClient(file).GenerateSasUri(sasBuilder);
return url;
}
}
}
179 changes: 179 additions & 0 deletions Tes.Runner.Test/BlobPipelineTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using Tes.Runner.Transfer;

namespace Tes.Runner.Test
{
[TestClass]
[TestCategory("Unit")]
public class BlobPipelineTests
{
#pragma warning disable CS8618
private BlobOperationPipelineTestImpl operationPipeline;
private BlobPipelineOptions options;
private readonly int blockSize = BlobSizeUtils.MiB;
private readonly long sourceSize = BlobSizeUtils.MiB * 10;
private string tempFile1;
private string tempFile2;
private Channel<byte[]> memoryBuffer;
private readonly RunnerTestUtils runnerTestUtils = new RunnerTestUtils();
#pragma warning restore CS8618

[TestInitialize]
public async Task SetUp()
{
tempFile1 = await RunnerTestUtils.CreateTempFileAsync();
tempFile2 = await RunnerTestUtils.CreateTempFileAsync();

memoryBuffer = await MemoryBufferPoolFactory.CreateMemoryBufferPoolAsync(5, blockSize);

options = new BlobPipelineOptions(blockSize, 10, 10, 10);
operationPipeline = new BlobOperationPipelineTestImpl(options, memoryBuffer, sourceSize);
}

[TestCleanup]
public void CleanUp()
{
RunnerTestUtils.DeleteFileIfExists(tempFile1);
RunnerTestUtils.DeleteFileIfExists(tempFile2);
}


[TestMethod]
public async Task ExecuteAsync_SingleOperation_CallsReaderWriterAndCompleteMethods_CorrectNumberOfTimes()
{
var blobOp = new BlobOperationInfo(new Uri("https://foo.bar/con/blob"), tempFile1, tempFile1, true);

await operationPipeline.ExecuteAsync(new List<BlobOperationInfo>() { blobOp });

//the number of calls should be size of the file divided by the number blocks
var expectedNumberOfCalls = (sourceSize / blockSize);

AssertReaderWriterAndCompleteMethodsAreCalled(operationPipeline, expectedNumberOfCalls, 1);
}

[TestMethod]
public async Task ExecuteAsync_TwoOperations_CallsReaderWriterAndCompleteMethods_CorrectNumberOfTimes()
{
var pipeline = new BlobOperationPipelineTestImpl(options, memoryBuffer, sourceSize);

var blobOps = new List<BlobOperationInfo>()
{
new BlobOperationInfo(new Uri("https://foo.bar/con/blob1"), tempFile1, tempFile1, true),
new BlobOperationInfo(new Uri("https://foo.bar/con/blob2"), tempFile2, tempFile2, true)
};
await pipeline.ExecuteAsync(blobOps);

//the number of calls should be size of the file divided by the number blocks, times the number of files
var expectedNumberOfCalls = (sourceSize / blockSize) * blobOps.Count;

AssertReaderWriterAndCompleteMethodsAreCalled(pipeline, expectedNumberOfCalls, 2);
}

private void AssertReaderWriterAndCompleteMethodsAreCalled(BlobOperationPipelineTestImpl operationPipeline, long numberOfWriterReaderCalls, int numberOfCompleteCalls)
{
List<MethodCall> executeWriteInfo = operationPipeline.MethodCalls["ExecuteWriteAsync"];
Assert.IsNotNull(executeWriteInfo);
Assert.AreEqual(numberOfWriterReaderCalls, executeWriteInfo.Count);

var executeReadInfo = operationPipeline.MethodCalls["ExecuteReadAsync"];
Assert.IsNotNull(executeReadInfo);
Assert.AreEqual(numberOfWriterReaderCalls, executeWriteInfo.Count);

var onCompletionInfo = operationPipeline.MethodCalls["OnCompletionAsync"];
Assert.IsNotNull(onCompletionInfo);
//complete must always be one
Assert.AreEqual(numberOfCompleteCalls, onCompletionInfo.Count);
}
}

/// <summary>
/// This is a test implementation of BlobPipeline.
/// Since there is no way to mock the base class, we have to create a test implementation and capture the execution of methods directly.
/// </summary>
class BlobOperationPipelineTestImpl : BlobOperationPipeline
{
private readonly ConcurrentDictionary<string, List<MethodCall>> methodCalls =
new ConcurrentDictionary<string, List<MethodCall>>();

private readonly long sourceLength;

private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1);

public BlobOperationPipelineTestImpl(BlobPipelineOptions pipelineOptions, Channel<byte[]> memoryBuffer, long sourceLength) : base(pipelineOptions, memoryBuffer)
{
this.sourceLength = sourceLength;
}

public ConcurrentDictionary<string, List<MethodCall>> MethodCalls => methodCalls;

public override ValueTask<int> ExecuteWriteAsync(PipelineBuffer buffer)
{
AddMethodCall(nameof(ExecuteWriteAsync), buffer);
return ValueTask.FromResult(buffer.Length);
}

public override ValueTask<int> ExecuteReadAsync(PipelineBuffer buffer)
{
AddMethodCall(nameof(ExecuteReadAsync), buffer);
return ValueTask.FromResult(buffer.Length);
}

public override Task<long> GetSourceLengthAsync(string source)
{
AddMethodCall(nameof(GetSourceLengthAsync), source);
return Task.FromResult(sourceLength);
}

public override Task OnCompletionAsync(long length, Uri? blobUrl, string fileName)
{
Debug.Assert(blobUrl != null, nameof(blobUrl) + " != null");
AddMethodCall(nameof(OnCompletionAsync), length, blobUrl, fileName);
return Task.CompletedTask;
}

public override void ConfigurePipelineBuffer(PipelineBuffer buffer)
{
AddMethodCall(nameof(ConfigurePipelineBuffer), buffer);
}

public async Task<long> ExecuteAsync(List<BlobOperationInfo> blobOperations)
{
var data = await ExecutePipelineAsync(blobOperations);

return data;
}

private void AddMethodCall(string methodName, params Object[] args)
{
//the add/update factories are not thread safe, hence the semaphore here...
semaphore.Wait();

try
{
Logger.LogInformation($"Adding method call {methodName} with args {args}");
methodCalls.AddOrUpdate(methodName,
(key) => new List<MethodCall>() { new MethodCall(key, 1, args.ToList()) },
(key, value) =>
{
value.Add(new MethodCall(methodName, value.Count + 1,
args.ToList()));
return value;
});

}
finally
{
semaphore.Release();
}
}

}

record MethodCall(string MethodName, int InvocationTime, List<object> Parameters);
}
81 changes: 81 additions & 0 deletions Tes.Runner.Test/BlobUploaderTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using Azure.Storage.Sas;
using Tes.Runner.Transfer;

namespace Tes.Runner.Test
{
[TestClass]
[TestCategory("Integration")]
[Ignore]
public class BlobUploaderTests
{
#pragma warning disable CS8618
private BlobContainerClient blobContainerClient;
private Guid containerId;
private BlobUploader blobUploader;
private readonly BlobPipelineOptions blobPipelineOptions = new BlobPipelineOptions();
#pragma warning restore CS8618

[TestInitialize]
public async Task Init()
{
containerId = Guid.NewGuid();
var options = new BlobClientOptions(BlobClientOptions.ServiceVersion.V2020_12_06);

var blobService = new BlobServiceClient("UseDevelopmentStorage=true", options);

blobContainerClient = blobService.GetBlobContainerClient(containerId.ToString());

blobContainerClient.Create(PublicAccessType.None);

blobUploader = new BlobUploader(blobPipelineOptions,
await MemoryBufferPoolFactory.CreateMemoryBufferPoolAsync(10, blobPipelineOptions.BlockSizeBytes));
}

[TestCleanup]
public void Cleanup()
{
blobContainerClient.DeleteIfExists();
}
[DataTestMethod]
[DataRow(10, 0)]
[DataRow(10, 100)]
[DataRow(100, 0)]
[DataRow(99, 1)]
[DataRow(100, 1)]
[DataRow(0, 0)]
public async Task UploadFile_LocalFileUploadsAndMatchesSize(int numberOfMiB, int extraBytes)
{
var file = await RunnerTestUtils.CreateTempFileWithContentAsync(numberOfMiB, extraBytes);
var blobClient = blobContainerClient.GetBlobClient(file);

// Create a SAS token that's valid for one hour.
var url = CreateSasUrl(blobClient, file);

await blobUploader.UploadAsync(new List<UploadInfo>() { new UploadInfo(file, url) });

var blobProperties = await blobClient.GetPropertiesAsync();
var fileSize = (numberOfMiB * BlobSizeUtils.MiB) + extraBytes;
Assert.IsNotNull(blobProperties);
Assert.AreEqual(fileSize, blobProperties.Value.ContentLength);
}

private Uri CreateSasUrl(BlobClient blobClient, string file)
{
var sasBuilder = new BlobSasBuilder(BlobContainerSasPermissions.All, DateTimeOffset.UtcNow.AddHours(1))
{
BlobContainerName = blobClient.GetParentBlobContainerClient().Name,
BlobName = blobClient.Name,
Resource = "b"
};

var url = blobContainerClient.GetBlobClient(file).GenerateSasUri(sasBuilder);
return url;
}
}
}
Loading