Search
K
  1. Remote Procedure Calls

Remote Procedure Calls With gRPC

Remote Command Bus

It's possible to have command classes live in one server and their respective handler classes located in a completely different remote server. When commands are executed, they go over the wire to the relevant remote server that hosts the handler. Once the handler execution completes, the result is brought back to the requesting server transparently as if it was an in-process call. Do however keep in mind that you'll be paying a slight performance penalty incurred by network latency and DTO serialization.

This feature is based on the gRPC for .NET package. By default, the communication channel is insecure (unencrypted) for ease of development and transient network errors cause the operations to be automatically retried. See the default channel options here. Make sure to set up TLS for the server (that hosts the handlers) as described here when deploying to production if the servers are not located in the same internal network/vnet. Ideally for speedy communication, consider placing all the servers in the same local network instead of making them communicate over the internet.

Project Setup

Instead of the traditional gRPC .proto files, our contract is simply defined by a Command & Result DTO combo. Typically you'd have a multi-project solution with at least 3 projects.

  1. Contracts - Where the command and result DTOs reside.
  2. Server - Where the command handlers are hosted.
  3. Client - Where the command execution is initiated.

The Shared Contracts Project

This project contains only the command/result DTOs such as these:

public class CreateOrderCommand : ICommand<CreateOrderResult>
{
    public int OrderId { get; set; }
    public string CustomerName { get; set; }
}

public class CreateOrderResult
{
    public string Message { get; set; }
}

This project only needs to reference FastEndpoints core messaging library (netstandard2.1):

  <ItemGroup>
    <PackageReference Include="FastEndpoints.Messaging.Core" Version="5.*" />
  </ItemGroup>

The Handler Server Project

This project hosts the command handlers served via Kestrel as gRPC endpoints and it needs to reference the Contracts project as well as the FastEndpoints remote messaging library (net6.0+).

  <ItemGroup>
    <ProjectReference Include="..\Contracts\Contracts.csproj" />
    <PackageReference Include="FastEndpoints.Messaging.Remote" Version="5.*" />
  </ItemGroup>

Implement the handler like this:

public sealed class CreateOrderHandler : ICommandHandler<CreateOrderCommand, CreateOrderResult>
{
    public Task<CreateOrderResult> ExecuteAsync(CreateOrderCommand cmd, CancellationToken _)
    {
        return Task.FromResult(new CreateOrderResult()
        {
            Message = $"Order {cmd.OrderId} created for {cmd.CustomerName}"
        });
    }
}

Add the handler server middleware to the ASP.Net pipeline and map/register the individual handlers as shown below:

var bld = WebApplication.CreateBuilder();

// Accept only HTTP/2 to allow insecure connections for development.
bld.WebHost.ConfigureKestrel(o => o.ListenLocalhost(6000, o => o.Protocols = HttpProtocols.Http2));

bld.AddHandlerServer();

var app = bld.Build();

app.MapHandlers(h =>
{
    h.Register<CreateOrderCommand, CreateOrderHandler, CreateOrderResult>();
});

app.Run();

Client / Command Initiator Project

This application (net6.0+) would also need to reference both the Contracts project as well as FastEndpoints remote messaging library like so:

  <ItemGroup>
    <ProjectReference Include="..\Contracts\Contracts.csproj" />
    <PackageReference Include="FastEndpoints.Messaging.Remote" Version="5.*" />
  </ItemGroup>

Map the remote handler server by specifying it's address and register the commands for that remote connection as shown below:

var bld = WebApplication.CreateBuilder();
var app = bld.Build();
app.MapRemote("http://localhost:6000", c =>
{
    c.Register<CreateOrderCommand, CreateOrderResult>();
});
INFO

A particular type of command can only be associated with a single remote connection (handler server). I.e. if you register the same command inside of multiple MapRemote(...) calls, only the last call will be effective.

You are then able to execute the command as you typically do using the appropriately named RemoteExecuteAsync() extension method like so:

var result = await new CreateOrderCommand
{
    OrderId = 1001,
    CustomerName = "Holly Simms"
}
.RemoteExecuteAsync();

Voila!

That's all there's to it. No need for .proto file creation, service classes or decorating properties of DTOs with attributes. This RPC functionality is mainly geared towards easily moving parts of your FastEndpoints monoliths out to remote servers as microservices (initially without a message broker) or for any other synchronous messaging needs.

With a bit of extra effort/configuration it would be possible to enable a handler server to accept incoming gRPC requests/commands from many hundreds of clients (.Net only - end users who would be the initiators of the commands). You'd be configuring authentication & authorization as described here but it's recommended to just use FE REST Endpoints instead for this purpose.

Client Streaming & Server Streaming

As with typical gRPC, it's possible to send a stream of items/DTOs to the handler server and get back a result once the stream ends with Client Streaming.

With Server Streaming, the client sends a single command to the handler server and gets back a continuous stream of results.

Samples of both as well as the full source code of the above example can be found on GitHub.

Setting Client/Channel Options

GrpcChannelOptions can be specified at startup like so:

app.MapRemote("http://localhost:6000", c =>
{
    c.ChannelOptions.MaxRetryAttempts = 5;
    c.ChannelOptions.HttpHandler = new() { ... };
    c.ChannelOptions.ServiceConfig = new() { ... };
});

Client Call Options

CallOptions can be handed to the RemoteExecuteAsync() method like so:

var result = await new MyCommand
{
    ...
}
.RemoteExecuteAsync(new CallOptions(...));

Binary Serializer

MessagePack binary serialization is used (instead of Protobuf) with it's contractless resolver (eliminates the need for annotating properties) together with Lz4BlockArray compression to (de)serialize commands/results.


Remote Pub/Sub Event Queues

Please refer to the Command Bus section above for an introduction to the gRPC implementation in FastEndpoints. This section only describes the event queuing system that employs the same underlying gRPC machinery.

Event Bus Vs. Event Queue

Even though the two patterns may look similar due to the use of the same interfaces (IEvent & IEventHandler<TEvent>), they behave quite differently.

Event Bus (In Process):

When events are published, the corresponding handlers(subscribers) are executed in-line/synchronously within the same process and there's no application/network boundary to cross. Each event that's published waits until the handler completes processing the event (unless otherwise instructed). If there's an exception thrown by the handler, the event publisher/initiator can catch and handle it.

Event Queue (Remote):

Due to the disconnected nature of the event publisher & subscribers, it is not possible to do the handler executions synchronously/in-line in a performant manner.

Instead, both the publisher and subscribers have their own internal/in-memory queues (by default). When an event gets published, it gets added to the internal queue for later distribution to the subscribers. The events are seamlessly streamed to the subscribers and the handlers located on the subscribers only get executed if/when the events are finally received by them (usually in a matter of milliseconds).

If there are transient network communication issues, the operations are automatically retried in order to deliver the events to all subscribers without losing/missing any events.

It is not possible for the publisher to know which subscribers successfully executed the handlers or if they were even executed at all. Basically this is a "best effort" fire-n-forget asynchronous event broadcasting system. You will have to establish out-of-band communication with the subscribers if you want to check the status/progress of the handler executions. An appropriate RPC command option described above can be used for this purpose.

WARNING

Since both publishers & subscribers hold pending events in memory (by default), it is possible to lose events if the processes are killed/restarted while there are pending operations or if the internal queues get overflowed due to slow processing of handlers/subscribers.

Bottom Line: If you need a reliable event broadcasting system, either use a dedicated message broker or implement a custom storage provider for event persistence as described here. If using the default in-memory storage provider, use it only as the means of transport/distribution of events to the remote nodes and create some sort of job queue system around it so that losing events is acceptable.

Setup & Usage

Similarly to the RPC Command Bus described above, you'd typically be using a multi-project solution with at least 3 projects.

Shared Contracts Project

Create the event DTO/model class by marking it with the IEvent interface:

public sealed class SomethingHappened : IEvent
{
    public int Id { get; set; }
    public string Description { get; set; }
}

Event Publisher Project (gRPC Server)

Setup the gRPC server and register an event hub for SomethingHappened event.

var bld = WebApplication.CreateBuilder();
bld.WebHost.ConfigureKestrel(o => o.ListenLocalhost(6000, o => o.Protocols = HttpProtocols.Http2));
bld.AddHandlerServer();

var app = bld.Build();
app.MapHandlers(h =>
{
    h.RegisterEventHub<SomethingHappened>();
});
app.Run()

Event Subscriber Project (gRPC Client)

Create the event handler by implementing IEventHandler<TEvent> interface.

internal class WhenSomethingHappens : IEventHandler<SomethingHappened>
{
    private readonly ILogger<WhenSomethingHappens> _logger;

    public WhenSomethingHappens(ILogger<WhenSomethingHappens> logger)
    {
        _logger = logger;
    }

    public Task HandleAsync(SomethingHappened evnt, CancellationToken ct)
    {
        _logger.LogInformation("{number} - {description}", evnt.Id, evnt.Description);
        return Task.CompletedTask;
    }
}

Map the remote connection to the server/publisher and subscribe to the broadcast.

var bld = WebApplication.CreateBuilder();
var app = bld.Build();

app.MapRemote("http://localhost:6000", c =>
{
    c.Subscribe<SomethingHappened, WhenSomethingHappens>();
});
app.Run()

Broadcast/Publish Events On the Server

Once everything is wired up as above, now all that's left to do is publish events like below from the server:

new SomethingHappened
{
    Id = 1,
    Description = "I am a test event!"
}
.Broadcast();

The full source code for the above examples are available on this GitHub repo.

Reliable Event Queues With Persistence

As explained above, the default in-memory event queue storage provider may cause some events to be lost in exceptional circumstances, which is a typical weakness of event queues without persistence.

At the expense of overhead incurred for disk/database access, it's quite simple to add persistence by implementing your own event storage providers for both the subscribers and publishers.

Publisher Persistence

First, define an event storage record entity by implementing the interface IEventStorageRecord such as this example.

Then, create an implementation of IEventHubStorageProvider such as this example for MongoDB.

Next, specify your implementations at startup setting them as generic parameters to the MapHandlers call:

app.MapHandlers<EventRecord, HubStorageProvider>(h =>
{
    h.RegisterEventHub<SomethingHappened>();
});

Subscriber Persistence

Similarly to a publisher, you'd be implementing a storage entity model by implementing IEventStorageRecord such as this example.

As well as a storage provider by implementing IEventSubscriberStorageProvider such as this example for MongoDB.

Your implementations for the subscriber event storage can be specified like so at startup:

bld.Services.AddEventSubscriberStorageProvider<EventRecord, SubscriberStorageProvider>();

Now, pending events will not be held in memory and in case of interruptions, things will be picked up from where they left off.

If using EF Core as the ORM, see here how to configure it to support storing event storage entities.

Event Queue Error Notifications

You have the choice of taking some action when errors occur in both the publisher & subscriber event queues/ storage providers. This is totally optional and the default behavior is to simply log the issues and retry the operations. Subscribing to these error notifications may be beneficial in case you'd like to do something like the following:

  • Notify a human when event handler execution fails repeatedly causing the in-memory queues to overflow.
  • Move an event to a dead-letter-queue if it's failed for a given number of attempts.
  • Reschedule a failed event to be retried at a future date/time.
  • Discard an event that failed repeatedly for a given number of tries.

You'd be hooking in to the error notifications by implementing the abstract classes SubscriberExceptionReceiver and/or EventHubExceptionReceiver and overriding the virtual methods you're interested in.

Subscriber Exception Receiver Methods:

  • OnStoreEventRecordError(...): triggered when the storage provider has trouble persisting an event record.
  • OnGetNextEventRecordError(...): triggered when the storage provider has trouble retrieving the next event record.
  • OnHandlerExecutionError(...): triggered when the event handler has trouble executing the HandleAsync(...) method.
  • OnMarkEventAsCompleteError(...): triggered when the storage provider has trouble marking an event record as complete.

Your error receiver implementation would look something like the following. Dependency injection via constructor is also supported.

internal sealed class MySubErrorReceiver : SubscriberExceptionReceiver
{
    public override async Task OnMarkEventAsCompleteError<TEvent>(IEventStorageRecord record,
                                                                  int attemptCount,
                                                                  Exception exception,
                                                                  CancellationToken ct)
    {
        //do whatever you like here
    }
}

You then need to register it in DI at startup like so:

bld.Services.AddSubscriberExceptionReceiver<MySubErrorReceiver>();

Publisher/Hub Exception Receiver Methods:

  • OnGetNextEventRecordError(..): triggered when the storage provider has trouble retrieving the next event record.
  • OnMarkEventAsCompleteError(..): triggered when the storage provider has trouble marking an event record as complete.
  • OnStoreEventRecordError(..): triggered when the storage provider has trouble persisting an event record.
  • OnInMemoryQueueOverflow(..): triggered when the default in-memory storage provider's internal queue for a given event type has been stagnant and in an overflow state, causing the current event to not be accepted into the queue and discarded.

A typical hub/publisher error receiver would look something like this:

internal sealed class MyHubErrorReceiver : EventHubExceptionReceiver
{
    public override Task OnGetNextEventRecordError<TEvent>(string subscriberID,
                                                           int attemptCount,
                                                           Exception exception,
                                                           CancellationToken ct)
    {
        //do whatever you like here
    }
}

Which you need to be registering in DI like so:

bld.Services.AddEventHubExceptionReceiver<MyHubErrorReceiver>();

Event Broker Mode

By default, when you register an event hub via RegisterEventHub<TEvent>(), the hub doesn't accept any events from remote clients/publishers. Only the server itself can broadcast events to it's subscribers.

If there's a requirement to allow remote/external event publishers to send events to the hub, you can easily configure the event hub to act as a relay (event broker) and distribute the events received from external publishers to the connected subscribers. To enable the event broker mode, simply do the following on the gRPC server:

Hub Setup

app.MapHandlers(h =>
{
    h.RegisterEventHub<SomethingHappened>(HubMode.EventBroker);
});

In event broker mode, both the server itself and external publishers can issue events.

Remote Publisher Setup

An external publisher app needs to be configured like so during startup:

app.MapRemote("http://localhost:6000", c =>
{
    c.RegisterEvent<SomethingHappened>();
});

Provide the address of the the gRPC server that hosts the event hub/broker and register the type of the event model. After which, the publisher app can send events to the broker as usual like so:

await new SomethingHappened
{
    Id = 101,
    Description = "Hello World!"
}
.RemotePublishAsync();

Do note the RemotePublishAsync() extension method is a Synchronous/In-Line messaging operation. Internally it's a Unary gRPC call. If it succeeds without throwing an exception, that means the broker server successfully accepted the event and is queued for broadcasting to the subscribers that are connected to it. As with all gRPC operations, any transient errors will cause the operation to be automatically retried. I.e. sending an event to a broker is not a Fire-N-Forget operation, and your code needs to handle any exceptions that may be thrown due to un-recoverable network issues/unavailability of the broker.

See this GitHub repo for a complete example of an Event Broker in use together with an external publisher and subscriber.

Round-Robin Mode

Typically an event hub will send an event to all known subscribers. Each subscriber receives a copy of the exact same events. Event hubs can be configured to deliver each event to just one of the connected subscribers in a round-robin fashion. Say for example, there's 3 events being published and 2 subscribers A and B connected to the hub. In round-robin mode, they will be delivered like so:

  • Event 1 -> Subscriber A
  • Event 2 -> Subscriber B
  • Event 3 -> Subscriber A

And so on. No two subscribers will ever receive the same event. This comes in handy when you want to distribute the workload to a pool of remote nodes, while ensuring that only one subscriber will be processing a particular event.

Hub Setup (gRPC Server)

app.MapHandlers(h =>
{
    h.RegisterEventHub<SomeEvent>(HubMode.RoundRobin);
});

You just need to change the hub-mode when configuring the app. The subscriber (gRPC client) setup and event publishing would be the same.

Combining Event Broker & Round-Robin Modes

If you'd like a hub to act as a broker as well while in round-robin mode, configure the hub like this:

.RegisterEventHub<SomeEvent>(HubMode.EventBroker | HubMode.RoundRobin);

© FastEndpoints 2024