Search
K
  1. Job Queues

Job Queueing With Commands

Work that can be processed in the background such as sending emails, pdf generation, report creation, etc. that doesn't require a result to be available immediately would be prime candidates for the Job Queue functionality in FastEndpoints.

Job queues allow you to schedule Commands to be executed in the background while limiting how many command instances of the same type can be executed at the same time. It is sometimes crucial to be in control of the degree of parallelism for certain types of tasks such as long-running or CPU intensive work to prevent the server from grinding to a halt, as well as to stay within access control limits of third-party services.

Queueing A Job

Similarly to the Command Bus, the same ICommand and it's companion ICommandHandler<TCommand> is used to define the data contract and the execution logic such as the following:

public class MyCommand : ICommand
{
    ...
}

public class MyCommandHandler : ICommandHandler<MyCommand>
{
    public Task ExecuteAsync(MyCommand command, CancellationToken ct)
    {
        ...
    }
}

When you need to queue a command as a job, instead of executing it immediately, simply call the extension method QueueJobAsync() on the command DTO like so:

await new MyCommand { ... }.QueueJobAsync();

A background job encapsulating the command is created and added to a queue for that type of command. There's a job queue per each command type in your application. If there's 10 command types, there'd be 10 independent queues processing jobs.

Execution Options

At the time of queueing, it's possible to specify a future point of time after which the command/job is to be executed instead of immediately (which is the default behavior if you don't specify anything). This does not however mean that the job will be executed at the exact given time. It just will not be executed before that time.

The default expiry time of jobs is 4 hours from the time of creation, which you can override as shown below. If for some reason the job doesn't execute/complete successfully before the expiry time, it will be considered stale/incomplete and can be purged from the queue (which is discussed in the job persistence section below).

.QueueJobAsync(
    executeAfter: DateTime.UtcNow.AddMinutes(30),
    expireOn: DateTime.UtcNow.AddHours(8));

Enabling Job Queues

Job queues are not enabled by default and must be configured at startup. Since job queues are designed to be reliable and not lose data in case of server restarts/crashes, it's required of you to implement a storage provider on any database/storage medium of your choice. How to implement the storage provider is discussed below. For now, let's focus on the startup configuration.

program.cs
var bld = WebApplication.CreateBuilder();
bld.Services
   .AddFastEndpoints()
   .AddJobQueues<JobRecord, JobStorageProvider>(); //ignore generic arguments for now

var app = bld.Build();
app.UseFastEndpoints()
   .UseJobQueues();
app.Run();

Per Queue Execution Limits

By default, each queue will process multiple commands in parallel. The default limit is the number of logical processors of the machine. For example, if the server has 4 cores/threads, at most only 4 commands of the same type will execute at the same time. You can customize the max concurrency setting at startup like this:

.UseJobQueues(o => o.MaxConcurrency = 2);

Queued jobs can be given a certain amount of time to execute. Command executions exceeding that time limit would automatically get cancelled and retried. By default however, commands are allowed to execute without a limit. You can specify a maximum execution time like so:

.UseJobQueues(o => o.ExecutionTimeLimit = TimeSpan.FromSeconds(10));

Specifying the limits like above applies to all types of commands, which you can override per type if needed:

.UseJobQueues(o =>
{
    //general per queue limits
    o.MaxConcurrency = 2; 
    o.ExecutionTimeLimit = TimeSpan.FromSeconds(10);
    
    //applicable only to MyCommand
    o.LimitsFor<MyCommand>( 
        maxConcurrency: 8,
        timeLimit: TimeSpan.FromSeconds(5));
});

That's all the configuration needed (other than implementing the storage provider discussed below). As with the command bus, there's no need to register individual commands & handlers. They are auto discovered by the library.

Job Persistence

In order to provide the storage mechanism for job queues, two interfaces must be implemented.

The storage record entity is simply a POCO containing the actual command DTO together with some metadata. As for the storage provider class, it simply needs to delegate data access to whatever database/storage engine that stores the jobs as shown with the MongoDB example below:

sealed class JobStorageProvider : IJobStorageProvider<JobRecord>
{
    private readonly DbContext db;

    public JobProvider(DbContext db)
    {
        this.db = db; //inject the dbContext
    }

    public Task StoreJobAsync(JobRecord job, CancellationToken ct)
    {
        // persist the provided job record to the database
        return db.SaveAsync(job, ct);
    }

    public async Task<IEnumerable<JobRecord>> GetNextBatchAsync(PendingSearchParams<JobRecord> p)
    {
        // return a batch of pending jobs to be processed next
        return await db
            .Find<JobRecord>()
            .Match(p.Match) //use the provided boolean lambda expression to match entities
            .Limit(p.Limit) //either use the provided limit or choose your own
            .ExecuteAsync(p.CancellationToken); //pass the provided cancellation token
    }

    public Task MarkJobAsCompleteAsync(JobRecord job, CancellationToken ct)
    {
        // either replace the supplied job record in the db.
        // or do a partial update of just the 'IsComplete' property.
        // or delete the entity now if batch deleting later is not preferred.
        return db
            .Update<JobRecord>()
            .MatchID(job.ID)
            .Modify(r => r.IsComplete, true)
            .ExecuteAsync(ct);
    }

    public Task OnHandlerExecutionFailureAsync(JobRecord job, Exception e, CancellationToken c)
    {
        // this is called whenever execution of a command's handler fails.
        // do nothing here if you'd like it to be automatically retried.
        // or update the 'ExecuteAfter' property to reschedule it to a future time.
        // or delete (or mark as complete) the entity if retry is unnecessary.
        return db
            .Update<JobRecord>()
            .MatchID(job.ID)
            .Modify(r => r.ExecuteAfter, DateTime.UtcNow.AddMinutes(1))
            .ExecuteAsync(c);
    }

    public Task PurgeStaleJobsAsync(StaleJobSearchParams<JobRecord> p)
    {
        // this method is called hourly.
        // do whatever you like with the stale (completed/expired) jobs.
        return db.DeleteAsync(p.Match, p.CancellationToken);
    }
}

The full source code of the above example is available on GitHub.

TIP

Using a document database such as MongoDB, LiteDB, etc. may be more suitable for implementing a job storage provider rather than using EF Core & SQL Server, as the EF Core DbContext needs additional configuration in order to support embedding command objects as well as supporting multithreading when being used as a singleton. See this example project that shows how to configure a pooled db context factory, which is the recommended way to use EF Core DbContext in storage providers.


© FastEndpoints 2024