Request-Reply in Messaging
The request-reply pattern allows a client to send a message and expect a reply of some kind. In practice, the request message will either be a command, which is an intention for service to carry out some work that results in a state change, or a query, which is a request for information.
Unlike request-reply constrained protocols like HTTP, NATS is not limited to a strict point-to-point interaction between a client and server. The request-reply pattern is built on top of the core publish-subscribe model.
By default, this means that any one of subscribers could be a responder and reply to the client. However, because NATS is not limited to point-to-point interactions, the client could indicate to NATS that multiple replies should be allowed.
This example shows the basics of the request-reply pattern including the standard “no responders” error if there are no subscribers available to handle and reply to the requesting message.
$ nbe run messaging/request-reply/dotnet2View the source code or learn how to run this example yourself
Code
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using NATS.Client.Core;
var stopwatch = Stopwatch.StartNew();
NATS_URL
environment variable can be used to pass the locations of the NATS servers.
var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222";
Log($"[CON] Connecting to {url}...");
Connect to NATS server. Since connection is disposable at the end of our scope we should flush our buffers and close connection cleanly.
var opts = NatsOpts.Default with { Url = url };
await using var nats = new NatsConnection(opts);
Create a message event handler and then subscribe to the target
subject which leverages a wildcard greet.*
.
When a user makes a “request”, the client populates
the reply-to field and then listens (subscribes) to that
as a subject.
The responder simply publishes a message to that reply-to.
await using var sub = await nats.SubscribeCoreAsync<int>("greet.*");
var reader = sub.Msgs;
var responder = Task.Run(async () =>
{
await foreach (var msg in reader.ReadAllAsync())
{
var name = msg.Subject.Split('.')[1];
Log($"[REP] Received {msg.Subject}");
await Task.Delay(500);
await msg.ReplyAsync($"Hello {name}!");
}
});
Make a request and wait a most 1 second for a response.
var replyOpts = new NatsSubOpts { Timeout = TimeSpan.FromSeconds(2) };
Log("[REQ] From joe");
var reply = await nats.RequestAsync<int, string>("greet.joe", 0, replyOpts: replyOpts);
Log($"[REQ] {reply.Data}");
Log("[REQ] From sue");
reply = await nats.RequestAsync<int, string>("greet.sue", 0, replyOpts: replyOpts);
Log($"[REQ] {reply.Data}");
Log("[REQ] From bob");
reply = await nats.RequestAsync<int, string>("greet.bob", 0, replyOpts: replyOpts);
Log($"[REQ] {reply.Data}");
Once we unsubscribe there will be no subscriptions to reply.
await sub.UnsubscribeAsync();
await responder;
Now there is no responder our request will timeout.
try
{
reply = await nats.RequestAsync<int, string>("greet.joe", 0, replyOpts: replyOpts);
Log($"[REQ] {reply.Data} - This will timeout. We should not see this message.");
}
catch (NatsNoReplyException)
{
Log("[REQ] timed out!");
}
That’s it! We saw how we can create a responder and request data from it. We also set request timeouts to make sure we can move on when there is no response to our requests.
Log("Bye!");
return;
void Log(string log) => Console.WriteLine($"{stopwatch.Elapsed} {log}");
Output
00:00:00.0008194 [CON] Connecting to nats://nats:4222... 00:00:00.1225386 [REQ] From joe 00:00:00.1374172 [REP] Received greet.joe 00:00:00.6489792 [REQ] Hello joe! 00:00:00.6490418 [REQ] From sue 00:00:00.6498312 [REP] Received greet.sue 00:00:01.1512245 [REQ] Hello sue! 00:00:01.1512731 [REQ] From bob 00:00:01.1518886 [REP] Received greet.bob 00:00:01.6531626 [REQ] Hello bob! 00:00:03.6546766 [REQ] timed out! 00:00:03.6547387 Bye!
Install
NATS.Net
from NuGet.