Skip to content

Commit caf9e47

Browse files
committed
Ensure all inflight tasks are processed before stopping. fix #2
1 parent 99990dc commit caf9e47

File tree

3 files changed

+49
-10
lines changed

3 files changed

+49
-10
lines changed

samples/Sample/Program.cs

+5-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
.AddPrometheusExporter());
1616

1717

18-
builder.Services.AddBackgroundTask();
18+
builder.Services.AddBackgroundTask(opt =>
19+
{
20+
opt.Channels.First().Capacity = 100;
21+
});
1922
builder.Services.AddScoped<ServiceA>();
2023

2124
var app = builder.Build();
@@ -36,6 +39,6 @@ public class ServiceA
3639
{
3740
public async Task DoWork()
3841
{
39-
await Task.Delay(10 * 1000);
42+
await Task.Delay(60 * 1000);
4043
}
4144
}

src/DCA.Extensions.BackgroundTask/BackgroundTaskHostedService.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ public async Task StartAsync(CancellationToken cancellationToken)
2525
public async Task StopAsync(CancellationToken cancellationToken)
2626
{
2727
_logger.LogInformation("Stopping background channels...");
28-
await Task.WhenAny(channels.Values.Select(x => x.StopAsync()));
28+
await Task.WhenAll(channels.Values.Select(x => x.StopAsync()));
29+
// await Task.WhenAny(channels.Values.Select(x => x.StopAsync()));
2930
_logger.LogInformation("Stopped background channels...");
3031
}
3132
}

test/DCA.Extensions.BackgroundTask.Test/ProcessTests.cs

+42-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
using System.Collections.Frozen;
2-
using FluentAssertions;
1+
using System.Collections.Frozen;
2+
using FluentAssertions;
33
using Microsoft.Extensions.DependencyInjection;
44
using Microsoft.Extensions.Hosting;
55
using Xunit;
@@ -21,22 +21,57 @@ public async Task Should_RecordCheckpoints()
2121
await host.StartAsync(default);
2222

2323
var dispatcher = provider.GetRequiredService<IBackgroundTaskDispatcher>();
24-
for (int i = 0; i < 20; i++)
25-
{
24+
for (int i = 0; i < 20; i++)
25+
{
2626
await dispatcher.DispatchAsync(ctx => new ValueTask(Task.Delay(200)), new MyTaskContext(i));
2727
}
2828

29-
SpinWait.SpinUntil(() =>
30-
{
29+
SpinWait.SpinUntil(() =>
30+
{
3131
var checkpoint = defaultChannel.Checkpoints.Cast<BackgroundTask<MyTaskContext>>().FirstOrDefault();
32-
return checkpoint?.Context.Id == 19;
32+
return checkpoint?.Context.Id == 19;
3333
}, 5000).Should().BeTrue();
3434

3535
await host.StopAsync(default);
3636
var checkpoint = defaultChannel.Checkpoints.Cast<BackgroundTask<MyTaskContext>>().Single();
3737
checkpoint.Context.Id.Should().Be(19);
3838
}
3939

40+
[Fact]
41+
public async Task ShouldFinishingInflightTasksBeforeStop()
42+
{
43+
const int channelCnt = 10;
44+
var services = new ServiceCollection();
45+
services.AddLogging();
46+
services.AddBackgroundTask(opt=>{
47+
opt.Channels.Clear();
48+
for(var i = 0;i<channelCnt;i++)
49+
{
50+
opt.Channels.Add(new BackgroundTaskChannelOptions{Key = i.ToString(), Capacity = -1});
51+
}
52+
});
53+
var provider = services.BuildServiceProvider();
54+
55+
var host = (BackgroundTaskHostedService)provider.GetRequiredService<IHostedService>();
56+
await host.StartAsync(default);
57+
58+
const int taskCnt = 1000;
59+
var processedCnt = 0;
60+
var dispatcher = provider.GetRequiredService<IBackgroundTaskDispatcher>();
61+
for (int ch=0;ch<channelCnt;ch++)
62+
{
63+
for (int i = 0; i < taskCnt; i++)
64+
{
65+
await dispatcher.DispatchAsync(async ctx => {
66+
await Task.Delay(300*ch);
67+
Interlocked.Increment(ref processedCnt);
68+
}, new MyTaskContext(i), channel: ch.ToString());
69+
}
70+
}
71+
72+
await host.StopAsync(default);
73+
processedCnt.Should().Be(channelCnt*taskCnt);
74+
}
4075
}
4176

4277
public record MyTaskContext(int Id) : IBackgroundTaskContext;

0 commit comments

Comments
 (0)