Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: configurable queue consummation delay #411

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions DocsV2/docs/Queuing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ meta:

[[toc]]

Coravel gives you a zero-configuration queue that runs in-memory.
Coravel gives you a zero-configuration queue that runs in-memory.

This is useful to offload long-winded tasks to the background instead of making your users wait for their HTTP request to finish.

Expand All @@ -33,6 +33,7 @@ public HomeController(IQueue queue) {
```

## Queuing Jobs

### Queuing Invocables

To queue an invocable, use `QueueInvocable`:
Expand Down Expand Up @@ -93,7 +94,7 @@ Use the `QueueAsyncTask` to queue up an async task:

### Queuing Synchronously

You use the `QueueTask()` method to add a task to the queue.
You use the `QueueTask()` method to add a task to the queue.

```csharp
public IActionResult QueueTask() {
Expand All @@ -109,7 +110,7 @@ Event broadcasting is great - but what if your event listeners are doing some he
By using `QueueBroadcast`, you can queue an event to be broadcasted in the background.

```csharp
this._queue.QueueBroadcast(new OrderCreated(orderId));
this._queue.QueueBroadcast(new OrderCreated(orderId));
```

### Queuing Cancellable Invocables
Expand All @@ -120,7 +121,7 @@ By using `QueueCancellableInvocable` you can build invocables that can be cancel

```csharp
var (taskGuid, token) = queue.QueueCancellableInvocable<CancellableInvocable>();

// Somewhere else....

token.Cancel();
Expand All @@ -138,6 +139,7 @@ while(!this.Token.IsCancellationRequested)
await ProcessNextRecord();
}
```

## Metrics

You can gain some insight into how the queue is doing at a given moment in time.
Expand All @@ -153,7 +155,7 @@ Available methods:

## Tracking Task Progress

Most of the methods on the `IQueue` interface will return a `Guid` that represents the unique id for the task you pushed to the queue. Also, Coravel's queue exposes some internal events that you can hook into.
Most of the methods on the `IQueue` interface will return a `Guid` that represents the unique id for the task you pushed to the queue. Also, Coravel's queue exposes some internal events that you can hook into.

Combining these: you can create listeners for the events `QueueTaskStarted` and `QueueTaskCompleted` that verify the progress of specific tasks in real-time. When a task/job crashes, then the event `DequeuedTaskFailed` will be emitted. Creating a listener for this one might be helpful too.

Expand Down Expand Up @@ -198,6 +200,19 @@ You can adjust this delay in the `appsettings.json` file.
}
```

Alternatively, you can adjust the consummation delay using `AddQueue`:

```csharp
services.AddQueue(queueOptions => {
// Consume queue every 5 seconds.
queueOptions.ConsummationDelay = 5;
});
```

:::tip
`QueueOptions` will take precedence over your configuration file if both are defined.
:::

## Logging Task Progress

Coravel uses the `ILogger` .NET Core interface to allow logging task progress:
Expand Down
17 changes: 17 additions & 0 deletions Src/Coravel/QueueServiceRegistration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,23 @@ public static class QueueServiceRegistration
/// <returns></returns>
public static IServiceCollection AddQueue(this IServiceCollection services)
{
services.AddSingleton<QueueOptions>(new QueueOptions());
services.AddSingleton<IQueue>(p =>
new Queue(
p.GetRequiredService<IServiceScopeFactory>(),
p.GetService<IDispatcher>()
)
);
services.AddHostedService<QueuingHost>();
return services;
}

public static IServiceCollection AddQueue(this IServiceCollection services, Action<QueueOptions> options)
{
var opt = new QueueOptions();
options(opt);

services.AddSingleton<QueueOptions>(opt);
services.AddSingleton<IQueue>(p =>
new Queue(
p.GetRequiredService<IServiceScopeFactory>(),
Expand Down
8 changes: 4 additions & 4 deletions Src/Coravel/Queuing/HostedService/QueuingHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ internal class QueuingHost : IHostedService, IDisposable
private Queue _queue;
private IConfiguration _configuration;
private ILogger<QueuingHost> _logger;
private QueueOptions _queueOptions;
private readonly string QueueRunningMessage = "Coravel Queuing service is attempting to close but the queue is still running." +
" App closing (in background) will be prevented until dequeued tasks are completed.";

public QueuingHost(IQueue queue, IConfiguration configuration, ILogger<QueuingHost> logger)
public QueuingHost(IQueue queue, IConfiguration configuration, ILogger<QueuingHost> logger, QueueOptions queueOptions)
{
this._configuration = configuration;
this._queue = queue as Queue;
this._logger = logger;
this._queueOptions = queueOptions;
}

public Task StartAsync(CancellationToken cancellationToken)
Expand All @@ -37,9 +39,7 @@ public Task StartAsync(CancellationToken cancellationToken)

private int GetConsummationDelay()
{
var configurationSection = this._configuration.GetSection("Coravel:Queue:ConsummationDelay");
bool couldParseDelay = int.TryParse(configurationSection.Value, out var parsedDelay);
return couldParseDelay ? parsedDelay : 30;
return this._queueOptions.GetConsummationDelay(this._configuration);
}

private async Task ConsumeQueueAsync()
Expand Down
26 changes: 26 additions & 0 deletions Src/Coravel/Queuing/QueueOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Coravel.Queuing
{
public class QueueOptions
{
/// <summary>
/// This determines how often (in seconds) the queue host will consume all pending tasks.
/// </summary>
public int? ConsummationDelay { get; set; }

public int GetConsummationDelay(IConfiguration configuration)
{
if (ConsummationDelay.HasValue) return ConsummationDelay.Value;

var configurationSection = configuration.GetSection("Coravel:Queue:ConsummationDelay");
bool couldParseDelay = int.TryParse(configurationSection.Value, out var parsedDelay);
return couldParseDelay ? parsedDelay : 30;
}
}
}