Skip to content

Commit 8c181fd

Browse files
committed
non batch tasks + valkey test + redisinsight update
1 parent ec7129a commit 8c181fd

File tree

7 files changed

+130
-26
lines changed

7 files changed

+130
-26
lines changed

Core.TaskProcessor.SampleWebApi/Core.TaskProcessor.SampleWebApi.csproj

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
</PropertyGroup>
99

1010
<ItemGroup>
11-
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.2" />
11+
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.3" />
1212
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.2" />
13-
<PackageReference Include="Scalar.AspNetCore" Version="2.0.26" />
13+
<PackageReference Include="Scalar.AspNetCore" Version="2.0.34" />
1414
</ItemGroup>
1515

1616
<ItemGroup>

Core.TaskProcessor.SampleWebApi/Program.cs

+23-15
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
builder.Services.AddTaskProcessor((sp, options) =>
1111
{
12-
options.Redis = "localhost:6379,abortConnect=false";
12+
options.Redis = "localhost:6380,abortConnect=false";
1313
options.Prefix = "{core}";
1414
options.Queues = ["high", "default"];
1515
options.MaxWorkers = 4;
@@ -20,19 +20,22 @@
2020
options.CleanUpFrequency = TimeSpan.FromMinutes(5);
2121
options.Retention = TimeSpan.FromDays(7);
2222
options.Deadletter = true;
23-
options.DeadletterUniqueSchedules = false;
23+
options.DeadletterSchedules = true;
2424
options.UseCronSeconds = true;
2525
options.OnTaskFailedDelay = (_, retry) =>
2626
Task.FromResult(retry switch
2727
{
2828
2 => TimeSpan.FromSeconds(5),
29-
1 => TimeSpan.FromSeconds(60),
29+
1 => TimeSpan.FromSeconds(10),
3030
_ => (TimeSpan?)null
3131
});
3232
options.OnTaskError = (_, exception) =>
3333
{
34+
//sp.GetRequiredService<ILogger<ITaskProcessor>>()
35+
// .LogError(exception, "Task Error");
36+
3437
sp.GetRequiredService<ILogger<ITaskProcessor>>()
35-
.LogError(exception, "Task Error");
38+
.LogError(exception.Message);
3639

3740
return Task.CompletedTask;
3841
};
@@ -58,23 +61,28 @@
5861
var proc = app.Services.GetRequiredService<ITaskProcessor>();
5962
await using var scope = app.Services.CreateAsyncScope();
6063
var faulty = scope.ServiceProvider.GetRequiredService<FaultyService>();
64+
var svc = scope.ServiceProvider.GetRequiredService<ISomeScopedService>();
6165

62-
//foreach (var schedule in await proc.GetSchedulesAsync("core", 0, 100))
63-
// await proc.CancelScheduleAsync(schedule.Id, "core");
66+
foreach (var schedule in await proc.GetSchedulesAsync("core", 0, 100))
67+
await proc.CancelScheduleAsync(schedule.Id, "core");
6468

6569
await proc.ResumeAsync();
6670

6771
//var discarded = await proc.DiscardDeadTasksAsync("default");
6872

69-
await proc.UpsertScheduleAsync(new ScheduleData
70-
{
71-
Id = "refresh",
72-
Tenant = "core",
73-
Queue = "default",
74-
Cron = "0 */2 * * * *",
75-
Timezone = "Etc/UTC",
76-
Unique = true
77-
}, () => faulty.DoFaultyStuff());
73+
//await proc.UpsertScheduleAsync(new ScheduleData
74+
//{
75+
// Id = "refresh",
76+
// Tenant = "core",
77+
// Queue = "default",
78+
// Cron = "0 */2 * * * *",
79+
// Timezone = "Etc/UTC",
80+
// Unique = true
81+
//}, () => faulty.DoFaultyStuff());
82+
83+
await proc.EnqueueTaskAsync("default", "core", () => faulty.DoFaultyStuff());
84+
//await proc.EnqueueTaskAsync("default", "core", () => svc.DoSomethingAsync("test", CancellationToken.None));
85+
//await proc.EnqueueTaskAsync("default", "core", () => svc.DoSomethingAsync("test-delay", CancellationToken.None), delayUntil: DateTimeOffset.UtcNow.AddSeconds(5));
7886
}
7987

8088
app.Run();

Core.TaskProcessor/Core.TaskProcessor.csproj

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
</ItemGroup>
3333

3434
<ItemGroup Condition=" '$(TargetFramework)' == 'net9.0' ">
35-
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.2" />
36-
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="9.0.2" />
35+
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.3" />
36+
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="9.0.3" />
3737
</ItemGroup>
3838

3939
</Project>

Core.TaskProcessor/ITaskProcessor.cs

+17
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,23 @@ public interface ITaskProcessor
3535
Task<string> EnqueueBatchAsync(string queue, string tenant, List<TaskData> tasks,
3636
List<TaskData>? continuations = null, string? scope = null);
3737

38+
/// <summary>
39+
/// enqueue single task without batch
40+
/// </summary>
41+
/// <param name="queue">queue to run tasks and continuations on</param>
42+
/// <param name="tenant">tenant id</param>
43+
/// <param name="task"></param>
44+
/// <returns>task id</returns>
45+
Task<string> EnqueueTaskAsync(string queue, string tenant, TaskData task);
46+
47+
/// <summary>
48+
/// enqueue single task without batch
49+
/// </summary>
50+
/// <param name="queue">queue to run tasks and continuations on</param>
51+
/// <param name="tenant">tenant id</param>
52+
/// <param name="methodCall"></param>
53+
/// <returns>task id</returns>
54+
Task<string> EnqueueTaskAsync(string queue, string tenant, Expression<Func<Task>> methodCall, DateTimeOffset? delayUntil = null, int? retries = null);
3855

3956
/// <summary>
4057
/// appends tasks to existing a batch. continuations will run again if batch previously completed.

Core.TaskProcessor/TaskProcessor.cs

+70-1
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,52 @@ public async Task<string> EnqueueBatchAsync(string queue, string tenant, List<Ta
139139
return batchId;
140140
}
141141

142+
public async Task<string> EnqueueTaskAsync(string queue, string tenant, TaskData task)
143+
{
144+
var taskId = Guid.NewGuid().ToString("D");
145+
146+
var q = task.Queue ?? queue;
147+
148+
var db = _redis.GetDatabase();
149+
var tra = db.CreateTransaction();
150+
#pragma warning disable CS4014
151+
152+
tra.HashSetAsync(Prefix($"task:{taskId}"), [
153+
new HashEntry("id", taskId),
154+
new HashEntry("tenant", tenant),
155+
new HashEntry("data", task.Data),
156+
new HashEntry("topic", task.Topic),
157+
new HashEntry("queue", q),
158+
new HashEntry("retries", task.Retries ?? _options.Retries)
159+
]);
160+
161+
if (task.DelayUntil.HasValue)
162+
tra.SortedSetAddAsync(Prefix($"queue:{queue}:pushback"), taskId,
163+
task.DelayUntil.Value.ToUnixTimeSeconds());
164+
else
165+
{
166+
tra.SortedSetAddAsync(Prefix("queues"), q, DateTimeOffset.UtcNow.ToUnixTimeSeconds());
167+
tra.ListLeftPushAsync(Prefix($"queue:{q}"), taskId);
168+
tra.PublishAsync(RedisChannel.Literal(Prefix($"queue:{q}:event")), "fetch");
169+
}
170+
171+
#pragma warning enable CS4014
172+
await tra.ExecuteAsync().ConfigureAwait(false);
173+
return taskId;
174+
}
175+
176+
public Task<string> EnqueueTaskAsync(string queue, string tenant, Expression<Func<Task>> methodCall, DateTimeOffset? delayUntil = null, int? retries = null)
177+
{
178+
return EnqueueTaskAsync(queue, tenant, new TaskData
179+
{
180+
Topic = "internal:expression:v1",
181+
Data = _options.ExpressionExecutor.Serialize(methodCall),
182+
DelayUntil = delayUntil,
183+
Queue = queue,
184+
Retries = retries
185+
});
186+
}
187+
142188
public async Task<bool> AppendBatchAsync(string queue, string tenant, string batchId, List<TaskData> tasks)
143189
{
144190
var db = _redis.GetDatabase();
@@ -306,6 +352,29 @@ public async Task<bool> FetchAsync()
306352
_tasks.TryAdd(info.TaskId, info);
307353
_actionBlock.Post(info);
308354
}
355+
else // single task
356+
{
357+
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_shutdown.Token);
358+
359+
var info = new TaskContext
360+
{
361+
Processor = this,
362+
TaskId = j,
363+
Tenant = (string)taskData["tenant"]!,
364+
Queue = (string?)taskData["queue"],
365+
CancelSource = linkedCts,
366+
CancelToken = linkedCts.Token,
367+
Topic = (string?)taskData["topic"] ?? string.Empty,
368+
Data = (byte[])taskData["data"]!,
369+
BatchId = null,
370+
Retries = (int?)taskData["retries"],
371+
IsCancellation = false,
372+
IsContinuation = false
373+
};
374+
375+
_tasks.TryAdd(info.TaskId, info);
376+
_actionBlock.Post(info);
377+
}
309378

310379
return true;
311380
}
@@ -575,7 +644,7 @@ private async Task Process(TaskContext task)
575644
tra.HashIncrementAsync(Prefix($"batch:{task.BatchId}"), "failed",
576645
flags: CommandFlags.FireAndForget);
577646
//remaining = tra.HashDecrementAsync(Prefix($"batch:{task.BatchId}"), "remaining");
578-
if (_options.Deadletter && (!string.IsNullOrEmpty(task.BatchId) || _options.DeadletterUniqueSchedules))
647+
if (_options.Deadletter && (string.IsNullOrEmpty(task.ScheduleId) || _options.DeadletterSchedules))
579648
tra.SortedSetAddAsync(Prefix($"queue:{task.Queue}:deadletter"), task.TaskId,
580649
DateTimeOffset.UtcNow.ToUnixTimeSeconds());
581650
else

Core.TaskProcessor/TaskProcessorOptions.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class TaskProcessorOptions
5252
/// <summary>
5353
/// when retries are exhausted on unique schedules they will never run again until task removed
5454
/// </summary>
55-
public bool DeadletterUniqueSchedules { get; set; } = false;
55+
public bool DeadletterSchedules { get; set; } = false;
5656

5757
/// <summary>
5858
/// deduplication window

redis/docker-compose.yml

+15-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
version: '3.9'
2-
31
services:
42

53
redis:
@@ -12,17 +10,29 @@ services:
1210
- redis:/data
1311
command: redis-server --appendonly yes
1412

13+
valkey:
14+
container_name: valkey
15+
image: valkey/valkey:8.1-alpine
16+
restart: unless-stopped
17+
ports:
18+
- 6380:6379
19+
volumes:
20+
- valkey:/data
21+
command: valkey-server --appendonly yes
22+
1523
redisinsight:
1624
container_name: redisinsight
17-
image: redislabs/redisinsight:latest
25+
image: redis/redisinsight:latest
1826
restart: unless-stopped
1927
volumes:
20-
- redisinsight:/db
28+
- redisinsight:/data
2129
depends_on:
2230
- redis
31+
- valkey
2332
ports:
24-
- 8001:8001
33+
- 5540:5540
2534

2635
volumes:
2736
redis:
37+
valkey:
2838
redisinsight:

0 commit comments

Comments
 (0)