Skip to content

Commit

Permalink
Defer starting workers until first snapshot has been downloaded (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
JakeYallop authored Jun 2, 2024
1 parent d10de07 commit 51321a5
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 18 deletions.
2 changes: 1 addition & 1 deletion WaybackDownloader/DefaultCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public override async Task<int> ExecuteAsync(CommandContext context, Settings se
}

var downloaderTask = downloaderService.StartDownloadAsync(settings.MatchUrl, settings.MatchType, settings.From, settings.To, settings.ParsedFilters, settings.LimitPages, workerCts.Token);
pageWorkerRunner.StartTasks(outputDir.FullName, settings.RateLimit, workerCts.Token);
pageWorkerRunner.StartWorkers(outputDir.FullName, settings.RateLimit, workerCts.Token);
pageWorkerRunnerTask = pageWorkerRunner.WaitForCompletionAsync();
await downloaderTask.ConfigureAwait(false);
await pageWorkerRunnerTask.ConfigureAwait(false);
Expand Down
8 changes: 8 additions & 0 deletions WaybackDownloader/Services/DownloaderService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public async Task StartDownloadAsync(string urlPrefix, string matchType, long? f
{
try
{
_downloadedFirstPage = true;
await _writer.WriteAsync(record, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
Expand All @@ -44,6 +45,13 @@ public async Task StartDownloadAsync(string urlPrefix, string matchType, long? f
}
}

private volatile bool _downloadedFirstPage;
public bool DownloadedFirstPage
{
get => _downloadedFirstPage;
private set => _downloadedFirstPage = value;
}

private async IAsyncEnumerable<CdxRecord> GetInitialFileListAsync(string url, string matchType, long? from, long? to, CdxFilter[] filters, long? webpageLimit, [EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (var record in cdxClient.GetSnapshotListAsync(url, matchType, from, to, filters, webpageLimit, cancellationToken: cancellationToken).WithCancellation(CancellationToken.None))
Expand Down
34 changes: 18 additions & 16 deletions WaybackDownloader/Services/PageWorkerRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,16 @@

namespace WaybackDownloader.Services;

public sealed class PageWorkerRunner(IServiceProvider serviceProvider, ILogger<PageWorkerRunner> logger) : IDisposable, IAsyncDisposable
internal sealed class PageWorkerRunner(IServiceProvider serviceProvider, ILogger<PageWorkerRunner> logger, DownloaderService downloaderService) : IDisposable, IAsyncDisposable
{
private readonly TaskCompletionSource _completionSource = new();
private readonly List<Task> _tasks = [];
#pragma warning disable CA2213 // Disposable fields should be disposed
//disposed by DI container
#pragma warning disable CA2213 // Disposable fields should be disposed - disposed by DI container
private readonly RateLimiter _limiter = serviceProvider.GetRequiredKeyedService<RateLimiter>(PageWorker.PageWorkerHttpClientRateLimiterKey);
#pragma warning restore CA2213 // Disposable fields should be disposed
public void StartTasks(string outputDir, int requestedDownloadLimit, CancellationToken cancellationToken)
{
_tasks.Add(StartAsync(outputDir, cancellationToken));
_tasks.Add(EvaluateLimitAsync(outputDir, requestedDownloadLimit, cancellationToken));
public void StartWorkers(string outputDir, int requestedDownloadLimit, CancellationToken cancellationToken)
=> _tasks.Add(EvaluateWorkersAsync(outputDir, requestedDownloadLimit, cancellationToken));

Task.WhenAll(_tasks).ContinueWith(t =>
{
_completionSource.SetResult();
}, TaskScheduler.Default);
}

public Task WaitForCompletionAsync() => _completionSource.Task;
public Task WaitForCompletionAsync() => Task.WhenAll(_tasks);

private Task StartAsync(string outputDir, CancellationToken cancellationToken)
{
Expand All @@ -33,11 +23,23 @@ private Task StartAsync(string outputDir, CancellationToken cancellationToken)

private long _lastTotalLeases;
private int _numberOfEvaluationsAtRequiredSpeed;
private async Task EvaluateLimitAsync(string outputDir, int requestedDownloadLimit, CancellationToken cancellationToken)
private async Task EvaluateWorkersAsync(string outputDir, int requestedDownloadLimit, CancellationToken cancellationToken)
{
const int SegmentDurationSeconds = 2;
const float MinimumThreshold = 0.9f;
await Task.Yield();
while (!downloaderService.DownloadedFirstPage)
{
await Task.Delay(1000, cancellationToken).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
if (cancellationToken.IsCancellationRequested)
{
return;
}
}

_tasks.Add(StartAsync(outputDir, cancellationToken));
await Task.Delay(SegmentDurationSeconds * 1000, cancellationToken).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);

while (!cancellationToken.IsCancellationRequested)
{
if (_tasks.Any(x => x.IsCompleted))
Expand Down
3 changes: 2 additions & 1 deletion WaybackDownloader/Services/WaybackCdxClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ internal sealed class WaybackCdxClient(HttpClient client, ILogger<WaybackCdxClie
var webpageLimitReached = false;
while (true)
{
var url = $"https://web.archive.org/cdx/search/cdx?{query}&collapse=digest&page={page}";
//pageSize == number of ~6000 line blocks, not the number of results on a page
var url = $"https://web.archive.org/cdx/search/cdx?{query}&collapse=digest&page={page}&pageSize=5";

_logger.DownloadingSnapshotPage(page, url);

Expand Down

0 comments on commit 51321a5

Please sign in to comment.