Request-Reply in Messaging
The request-reply pattern allows a client to send a message and expect a reply of some kind. In practice, the request message will either be a command, which is an intention for service to carry out some work that results in a state change, or a query, which is a request for information.
Unlike request-reply constrained protocols like HTTP, NATS is not limited to a strict point-to-point interaction between a client and server. The request-reply pattern is built on top of the core publish-subscribe model.
By default, this means that any one of subscribers could be a responder and reply to the client. However, because NATS is not limited to point-to-point interactions, the client could indicate to NATS that multiple replies should be allowed.
This example shows the basics of the request-reply pattern including the standard “no responders” error if there are no subscribers available to handle and reply to the requesting message.
$ nbe run messaging/request-reply/elixirView the source code or learn how to run this example yourself
Code
Mix.install([
For documentation on the Gnat library, see https://hexdocs.pm/gnat/readme.html
{:gnat, "~> 1.7.1"},
{:jason, "~> 1.0"}
])
url = System.get_env("NATS_URL", "nats://0.0.0.0:4222")
uri = URI.parse(url)
Call start_link
on Gnat
to start the Gnat application supervisor
{:ok, gnat} = Gnat.start_link(%{host: uri.host, port: uri.port})
The Gnat.ConnectionSupervisor
is a process that monitors your NATS connection. If connection
is lost, this process will retry according to its backoff settings to re-establish a connection.
You can communicate with NATS without this, but we recommend using supervised connections
and consumers
gnat_supervisor_settings = %{
name: :gnat,
backoff_period: 4_000,
connection_settings: [
%{host: uri.host, port: uri.port}
]
}
{:ok, _conn} = Gnat.ConnectionSupervisor.start_link(gnat_supervisor_settings)
Give the connection time to establish (this is only needed for these examples and not in production)
if Process.whereis(:gnat) == nil do
Process.sleep(300)
end
Now let’s set up a consumer supervisor. We use the subscription_topics
field in the configuration
map to set up a list of subscriptions which can optionally have queue names.
consumer_supervisor_settings = %{
connection_name: :gnat,
This is the name of a module defined below
module: ExampleService,
subscription_topics: [
%{topic: "request.demo"},
]
}
Starting the consumer supervisor will create the subscription and will monitor the connection to re-establish subscriptions in case of failure
{:ok , _pid} = Gnat.ConsumerSupervisor.start_link(consumer_supervisor_settings)
This is a module that conforms to the Gnat.Server
behavior. The name of this module
matches the module
field in the consumer supervisor settings.
defmodule ExampleService do
use Gnat.Server
This handler will simulate an error
def request(%{topic: "request.demo", body: body}) when body == "failure" do
{:error, "something went wrong!"}
end
This handler is matching just on the subject
def request(%{topic: "request.demo", body: body}) do
IO.puts "Demo request received message: #{inspect(body)}"
{:reply, "This is a demo"}
end
Defining an error handler is optional, the default one will just call Logger.error for you
def error(%{gnat: gnat, reply_to: reply_to}, error) do
Gnat.pub(gnat, reply_to, "An error occurred: #{inspect(error)}")
end
end
Now that we know there’s an active subscription on the request.demo
subject, we can
make a request and process the reply using Gnat.request
{:ok, %{body: res}} = Gnat.request(:gnat, "request.demo", "input data")
IO.puts("First result: #{res}")
{:ok, %{body: res}} = Gnat.request(:gnat, "request.demo", "more data")
IO.puts("Second result: #{res}")
Now cause a failure so we can see error responses
{:ok, %{body: res}} = Gnat.request(:gnat, "request.demo", "failure")
IO.puts("Failure result: #{res}")
Output
* creating /root/.mix/archives/hex-2.0.6 Resolving Hex dependencies... Resolution completed in 0.051s New: cowlib 2.12.1 ed25519 1.4.1 gnat 1.7.1 jason 1.4.1 nimble_parsec 1.4.0 nkeys 0.2.2 telemetry 1.2.1 * Getting gnat (Hex package) * Getting jason (Hex package) * Getting cowlib (Hex package) * creating /root/.mix/elixir/1-15/rebar3 * Getting nimble_parsec (Hex package) * Getting nkeys (Hex package) * Getting telemetry (Hex package) * Getting ed25519 (Hex package) You have added/upgraded packages you could sponsor, run `mix hex.sponsor` to learn more ==> ed25519 Compiling 2 files (.ex) Generated ed25519 app ==> nkeys Compiling 2 files (.ex) Generated nkeys app ==> nimble_parsec Compiling 4 files (.ex) Generated nimble_parsec app ===> Analyzing applications... ===> Compiling telemetry ==> jason Compiling 10 files (.ex) Generated jason app ===> Analyzing applications... ===> Compiling cowlib ==> gnat Compiling 11 files (.ex) Generated gnat app 18:23:21.015 [debug] connecting to %{port: 4222, host: "nats"} Demo request received message: "input data" First result: This is a demo Demo request received message: "more data" Second result: This is a demo Failure result: An error occurred: "something went wrong!"
Set up the dependencies for this script. Ordinarily you would have this set of dependencies declared in your
mix.exs
file.