Job Queueing With Commands
Work that can be processed in the background such as sending emails, pdf generation, report creation, etc. that doesn't require a result to be available immediately would be prime candidates for the Job Queue functionality in FastEndpoints.
Job queues allow you to schedule Commands to be executed in the background while limiting how many command instances of the same type can be executed at the same time. It is sometimes crucial to be in control of the degree of parallelism for certain types of tasks such as long-running or CPU intensive work to prevent the server from grinding to a halt, as well as to stay within access control limits of third-party services.
Queueing A Job
Similarly to the Command Bus, the same ICommand and it's companion ICommandHandler<TCommand> is used to define the data contract and the execution logic such as the following:
public class MyCommand : ICommand
{
...
}
public class MyCommandHandler : ICommandHandler<MyCommand>
{
public Task ExecuteAsync(MyCommand command, CancellationToken ct)
{
...
}
}
When you need to queue a command as a job, instead of executing it immediately, simply call the extension method QueueJobAsync() on the command DTO like so:
await new MyCommand { ... }.QueueJobAsync();
A background job encapsulating the command is created and added to a queue for that type of command. There's a job queue per each command type in your application. If there's 10 command types, there'd be 10 independent queues processing jobs.
Execution Options
At the time of queueing, it's possible to specify a future point of time after which the command/job is to be executed instead of immediately (which is the default behavior if you don't specify anything). This does not however mean that the job will be executed at the exact given time. It just will not be executed before that time.
The default expiry time of jobs is 4 hours from the time of creation, which you can override as shown below. If for some reason the job doesn't execute/complete successfully before the expiry time, it will be considered stale/incomplete and can be purged from the queue (which is discussed in the job persistence section below).
.QueueJobAsync(
executeAfter: DateTime.UtcNow.AddMinutes(30),
expireOn: DateTime.UtcNow.AddHours(8));
Enabling Job Queues
Job queues are not enabled by default and must be configured at startup. Since job queues are designed to be reliable and not lose data in case of server restarts/crashes, it's required of you to implement a storage provider on any database/storage medium of your choice. How to implement the storage provider is discussed below. For now, let's focus on the startup configuration.
var bld = WebApplication.CreateBuilder();
bld.Services
.AddFastEndpoints()
.AddJobQueues<JobRecord, JobStorageProvider>(); //ignore generic arguments for now
var app = bld.Build();
app.UseFastEndpoints()
.UseJobQueues();
app.Run();
Per Queue Execution Limits
By default, each queue will process multiple commands in parallel. The default limit is the number of logical processors of the machine. For example, if the server has 4 cores/threads, at most only 4 commands of the same type will execute at the same time. You can customize the max concurrency setting at startup like this:
.UseJobQueues(o => o.MaxConcurrency = 2);
Queued jobs can be given a certain amount of time to execute. Command executions exceeding that time limit would automatically get cancelled and retried. By default however, commands are allowed to execute without a limit. You can specify a maximum execution time like so:
.UseJobQueues(o => o.ExecutionTimeLimit = TimeSpan.FromSeconds(10));
Specifying the limits like above applies to all types of commands, which you can override per type if needed:
.UseJobQueues(o =>
{
//general per queue limits
o.MaxConcurrency = 2;
o.ExecutionTimeLimit = TimeSpan.FromSeconds(10);
//applicable only to MyCommand
o.LimitsFor<MyCommand>(
maxConcurrency: 8,
timeLimit: TimeSpan.FromSeconds(5));
});
That's all the configuration needed (other than implementing the storage provider discussed below). As with the command bus, there's no need to register individual commands & handlers. They are auto discovered by the library.
Job Persistence
In order to provide the storage mechanism for job queues, two interfaces must be implemented.
- IJobStorageRecord for the job storage entity. ( See example)
- IJobStorageProvider<TStorageRecord> for the storage provider. ( See example)
The storage record entity is simply a POCO containing the actual command DTO together with some metadata. As for the storage provider class, it simply needs to delegate data access to whatever database/storage engine that stores the jobs as shown with the MongoDB example below:
sealed class JobStorageProvider : IJobStorageProvider<JobRecord>
{
private readonly DbContext db;
public JobProvider(DbContext db)
{
this.db = db; //inject the dbContext
}
public Task StoreJobAsync(JobRecord job, CancellationToken ct)
{
// persist the provided job record to the database
return db.SaveAsync(job, ct);
}
public async Task<IEnumerable<JobRecord>> GetNextBatchAsync(PendingSearchParams<JobRecord> p)
{
// return a batch of pending jobs to be processed next
return await db
.Find<JobRecord>()
.Match(p.Match) //use the provided boolean lambda expression to match entities
.Limit(p.Limit) //either use the provided limit or choose your own
.ExecuteAsync(p.CancellationToken); //pass the provided cancellation token
}
public Task MarkJobAsCompleteAsync(JobRecord job, CancellationToken ct)
{
// either replace the supplied job record in the db.
// or do a partial update of just the 'IsComplete' property.
// or delete the entity now if batch deleting later is not preferred.
return db
.Update<JobRecord>()
.MatchID(job.ID)
.Modify(r => r.IsComplete, true)
.ExecuteAsync(ct);
}
public Task CancelJobAsync(Guid trackingId, CancellationToken ct)
{
// do a partial update of just the 'IsComplete' property.
// or delete the entity now if batch deleting later is not preferred.
return db.Update<JobRecord>()
.Match(r => r.TrackingID == trackingId)
.Modify(r => r.IsComplete, true)
.ExecuteAsync(ct);
}
public Task OnHandlerExecutionFailureAsync(JobRecord job, Exception e, CancellationToken c)
{
// this is called whenever execution of a command's handler fails.
// do nothing here if you'd like it to be automatically retried.
// or update the 'ExecuteAfter' property to reschedule it to a future time.
// or delete (or mark as complete) the entity if retry is unnecessary.
return db
.Update<JobRecord>()
.MatchID(job.ID)
.Modify(r => r.ExecuteAfter, DateTime.UtcNow.AddMinutes(1))
.ExecuteAsync(c);
}
public Task PurgeStaleJobsAsync(StaleJobSearchParams<JobRecord> p)
{
// this method is called hourly.
// do whatever you like with the stale (completed/expired) jobs.
return db.DeleteAsync(p.Match, p.CancellationToken);
}
}
The full source code of the above example is available on GitHub.
Using a document database such as MongoDB, LiteDB, etc. may be more suitable for implementing a job storage provider rather than using EF Core & SQL Server, as the EF Core DbContext needs additional configuration in order to support embedding command objects as well as supporting multithreading when being used as a singleton. See this example project that shows how to configure a pooled db context factory, which is the recommended way to use EF Core DbContext in storage providers.
Job Cancellations
Queued jobs can be cancelled anytime/from anywhere with its Tracking Id. When cancelled, the job will not be picked up for execution. If the job is already running, cancellation is requested via the cancellation token passed down to the command handler. Periodically check if the cancellation token is in a cancellation requested state and gracefully stop execution without throwing exceptions. See example here.
var trackingId = await new LongRunningCommand().QueueJobAsync();
await JobTracker<LongRunningCommand>.CancelJobAsync(trackingId);
Use either the JobTracker<TCommand> generic class or inject a IJobTracker<TCommand> instance from the DI Container to access the CancelJobAsync() method.
Jobs With Results
A command that returns a result (ICommand<TResult>) can also be queued up as a job. The result can be retrieved from anywhere via the JobTracker using the job's Tracking Id. To enable support for job results, simply implement the following addon interfaces:
public class JobRecord : IJobStorageRecord, IJobResultStorage
{
...
public object? Result { get; set; } // a property for storing the result
}
public class JobStorageProvider : IJobStorageProvider<JobRecord>, IJobResultProvider
{
...
public Task StoreJobResultAsync<TResult>(Guid trackingId, TResult result, CancellationToken ct)
{
// 1.) retrieve the job by trackingId.
// 2.) set the result on the job like so:
((IJobResultStorage)job).SetResult(result);
// 3.) persist the job entity back to the database.
}
public Task<TResult?> GetJobResultAsync<TResult>(Guid trackingId, CancellationToken ct)
{
// 1.) retrieve the job by trackingId.
// 2.) extract the result from the job like so:
var result = ((IJobResultStorage)job).GetResult<TResult>();
// 3.) return the result
}
Once the storage record entity and provider is set up, you can queue commands that return results as usual and use the job tracker to retrieve the result as follows:
// queue the command as a job and retrieve the tracking id
var trackingId = new MyCommand { ... }.QueueJobAsync();
// retrieve the result of the command using the tracking id
var result = await JobTracker<MyCommand>.GetJobResultAsync<MyResult>(trackingId);
Use either the JobTracker<TCommand> generic class or inject a IJobTracker<TCommand> instance from the DI Container to access the GetJobResultAsync() method. The result will be default for value types and null for reference types until the command handler completes its work. Click here for an EF Core example.