Skip to content

Commit 6af0073

Browse files
committed
unique schedules deadletter override + servicprovider injection for error handler + executer externalized
1 parent 972fcbd commit 6af0073

File tree

7 files changed

+90
-21
lines changed

7 files changed

+90
-21
lines changed

Core.TaskProcessor.SampleWebApi/Core.TaskProcessor.SampleWebApi.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<ItemGroup>
1111
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.2" />
1212
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.2" />
13-
<PackageReference Include="Scalar.AspNetCore" Version="2.0.22" />
13+
<PackageReference Include="Scalar.AspNetCore" Version="2.0.26" />
1414
</ItemGroup>
1515

1616
<ItemGroup>

Core.TaskProcessor.SampleWebApi/Program.cs

+48-13
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,40 @@
77
builder.Services.AddControllers();
88
builder.Services.AddOpenApi();
99

10-
builder.Services.AddTaskProcessor(new TaskProcessorOptions
10+
builder.Services.AddTaskProcessor((sp, options) =>
1111
{
12-
Redis = "localhost:6379,abortConnect=false",
13-
Prefix = "{core}",
14-
Queues = new[] { "high", "default" },
15-
MaxWorkers = 4,
16-
Retries = 3,
17-
BaseFrequency = TimeSpan.FromSeconds(1),
18-
Invisibility = TimeSpan.FromMinutes(5),
19-
Retention = TimeSpan.FromDays(7),
20-
UseHostedService = true,
21-
UseCronSeconds = true
12+
options.Redis = "localhost:6379,abortConnect=false";
13+
options.Prefix = "{core}";
14+
options.Queues = ["high", "default"];
15+
options.MaxWorkers = 4;
16+
options.Retries = 3;
17+
options.Invisibility = TimeSpan.FromMinutes(5);
18+
options.BaseFrequency = TimeSpan.FromSeconds(5);
19+
options.PushbackFrequency = TimeSpan.FromSeconds(10);
20+
options.CleanUpFrequency = TimeSpan.FromMinutes(5);
21+
options.Retention = TimeSpan.FromDays(7);
22+
options.Deadletter = true;
23+
options.DeadletterUniqueSchedules = false;
24+
options.UseCronSeconds = true;
25+
options.OnTaskFailedDelay = (_, retry) =>
26+
Task.FromResult(retry switch
27+
{
28+
2 => TimeSpan.FromSeconds(5),
29+
1 => TimeSpan.FromSeconds(60),
30+
_ => (TimeSpan?)null
31+
});
32+
options.OnTaskError = (_, exception) =>
33+
{
34+
sp.GetRequiredService<ILogger<ITaskProcessor>>()
35+
.LogError(exception, "Task Error");
36+
37+
return Task.CompletedTask;
38+
};
2239
});
40+
builder.Services.AddTaskProcessorExecutor();
2341

2442
builder.Services.AddScoped<ISomeScopedService, SomeScopedService>();
43+
builder.Services.AddScoped<FaultyService>();
2544

2645
var app = builder.Build();
2746

@@ -37,9 +56,25 @@
3756

3857
{
3958
var proc = app.Services.GetRequiredService<ITaskProcessor>();
59+
await using var scope = app.Services.CreateAsyncScope();
60+
var faulty = scope.ServiceProvider.GetRequiredService<FaultyService>();
61+
62+
//foreach (var schedule in await proc.GetSchedulesAsync("core", 0, 100))
63+
// await proc.CancelScheduleAsync(schedule.Id, "core");
64+
65+
await proc.ResumeAsync();
66+
67+
//var discarded = await proc.DiscardDeadTasksAsync("default");
4068

41-
foreach (var schedule in await proc.GetSchedulesAsync("core", 0, 100))
42-
await proc.CancelScheduleAsync(schedule.Id, "core");
69+
await proc.UpsertScheduleAsync(new ScheduleData
70+
{
71+
Id = "refresh",
72+
Tenant = "core",
73+
Queue = "default",
74+
Cron = "0 */2 * * * *",
75+
Timezone = "Etc/UTC",
76+
Unique = true
77+
}, () => faulty.DoFaultyStuff());
4378
}
4479

4580
app.Run();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+

2+
namespace Core.TaskProcessor.SampleWebApi.Services;
3+
4+
public class FaultyService(ILogger<FaultyService> logger)
5+
{
6+
public async Task DoFaultyStuff()
7+
{
8+
logger.LogInformation(nameof(DoFaultyStuff));
9+
await Task.Delay(200);
10+
throw new Exception("crash");
11+
}
12+
}

Core.TaskProcessor/Core.TaskProcessor.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
<ItemGroup>
2525
<PackageReference Include="Cronos" Version="0.9.0" />
26-
<PackageReference Include="StackExchange.Redis" Version="2.8.24" />
26+
<PackageReference Include="StackExchange.Redis" Version="2.8.31" />
2727
</ItemGroup>
2828

2929
<ItemGroup Condition=" '$(TargetFramework)' == 'net8.0' ">

Core.TaskProcessor/ServiceExtension.cs

+20-3
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,28 @@ public static class ServiceExtension
66
{
77
public static IServiceCollection AddTaskProcessor(this IServiceCollection collection, TaskProcessorOptions options)
88
{
9-
//collection.AddSingleton<IRemoteExpressionExecutor, RemoteExpressionExecutor>();
109
collection.AddSingleton<ITaskProcessor>(new TaskProcessor(options));
1110

12-
if (options.UseHostedService)
13-
collection.AddHostedService<TaskExecutorService>();
11+
return collection;
12+
}
13+
14+
public static IServiceCollection AddTaskProcessor(this IServiceCollection collection, Action<IServiceProvider, TaskProcessorOptions> configure)
15+
{
16+
var options = new TaskProcessorOptions();
17+
18+
collection.AddSingleton<ITaskProcessor>(sp =>
19+
{
20+
configure(sp, options);
21+
22+
return new TaskProcessor(options);
23+
});
24+
25+
return collection;
26+
}
27+
28+
public static IServiceCollection AddTaskProcessorExecutor(this IServiceCollection collection)
29+
{
30+
collection.AddHostedService<TaskExecutorService>();
1431

1532
return collection;
1633
}

Core.TaskProcessor/TaskProcessor.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,7 @@ private async Task Process(TaskContext task)
575575
tra.HashIncrementAsync(Prefix($"batch:{task.BatchId}"), "failed",
576576
flags: CommandFlags.FireAndForget);
577577
//remaining = tra.HashDecrementAsync(Prefix($"batch:{task.BatchId}"), "remaining");
578-
if (_options.Deadletter)
578+
if (_options.Deadletter && (!string.IsNullOrEmpty(task.BatchId) || _options.DeadletterUniqueSchedules))
579579
tra.SortedSetAddAsync(Prefix($"queue:{task.Queue}:deadletter"), task.TaskId,
580580
DateTimeOffset.UtcNow.ToUnixTimeSeconds());
581581
else
@@ -663,7 +663,7 @@ private async Task Process(TaskContext task)
663663
CommandFlags.FireAndForget);
664664
}
665665

666-
if (!task.IsContinuation && string.IsNullOrEmpty(task.BatchId))
666+
if (!task.IsContinuation && !string.IsNullOrEmpty(task.BatchId))
667667
tra.HashIncrementAsync(Prefix($"batch:{task.BatchId}"), "duration",
668668
(DateTimeOffset.UtcNow - start).TotalSeconds, CommandFlags.FireAndForget);
669669
#pragma warning restore CS4014

Core.TaskProcessor/TaskProcessorOptions.cs

+6-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ public class TaskProcessorOptions
4949
/// </summary>
5050
public bool Deadletter { get; set; } = true;
5151

52+
/// <summary>
53+
/// when retries are exhausted on unique schedules they will never run again until task removed
54+
/// </summary>
55+
public bool DeadletterUniqueSchedules { get; set; } = false;
56+
5257
/// <summary>
5358
/// deduplication window
5459
/// </summary>
@@ -60,7 +65,7 @@ public class TaskProcessorOptions
6065

6166
public string Redis { get; set; } = string.Empty;
6267

63-
public bool UseHostedService { get; set; }
68+
//public bool UseHostedService { get; set; }
6469
public bool UseCronSeconds { get; set; }
6570

6671
public IRemoteExpressionExecutor ExpressionExecutor { get; set; } = new RemoteExpressionExecutor();

0 commit comments

Comments
 (0)