Core Publish-Subscribe in Messaging
This example demonstrates the core NATS publish-subscribe behavior. This is the fundamental pattern that all other NATS patterns and higher-level APIs build upon. There are a few takeaways from this example:
- Delivery is an at-most-once. For MQTT users, this is referred to as Quality of Service (QoS) 0.
- There are two circumstances when a published message won’t be delivered to a subscriber:
- The subscriber does not have an active connection to the server (i.e. the client is temporarily offline for some reason)
- There is a network interruption where the message is ultimately dropped
- Messages are published to subjects which can be one or more concrete tokens, e.g.
greet.bob
. Subscribers can utilize wildcards to show interest on a set of matching subjects.
Code
Mix.install([
For documentation on the Gnat library, see https://hexdocs.pm/gnat/readme.html
{:gnat, "~> 1.6"},
{:jason, "~> 1.0"}
])
url = System.get_env("NATS_URL", "nats://127.0.0.1:4222")
uri = URI.parse(url)
Call start_link
on Gnat
to start the Gnat application supervisor
{:ok, gnat} = Gnat.start_link(%{host: uri.host, port: uri.port})
Manual subscriptions are easy and straightforward, just supply a topic and the target
pid to receive the {:msg, m}
messages from the subscription.
{:ok, subscription} = Gnat.sub(gnat, self(), "nbe.*")
Here we send a message to a subject that has a subscriber
:ok = Gnat.pub(gnat, "nbe.news", "NATS by example is a great learning resource!")
In elixir, an explicit receive
call blocks the current process until a message
arrives in its inbox. In this case, we’re waiting for the {:msg, m}
tuple from
the NATS client.
receive do
{:msg, %{body: body, topic: "nbe.news", reply_to: nil}} ->
IO.puts("Manual subscription received: '#{body}'")
end
Now let’s move on to more resilient and production-grade ways of subscribing
In addition to one-off subscriptions, you can create a resilient consumer
supervisor that you intend to keep running for a long period of time that
will survive network partition events. This consumer supervisor can invoke
callbacks in a module that conforms to the Gnat.Server
behavior, like this
DemoServer
module.
defmodule DemoServer do
use Gnat.Server
def request(%{body: body, topic: topic}) do
IO.puts("Received message on '#{topic}': '#{body}'")
:ok
end
The error handler is an optional callback. Gnat.Server
has a default one that you
can use.
def error(%{gnat: gnat, reply_to: reply_to}, _error) do
Gnat.pub(gnat, reply_to, "Something went wrong and I can't handle your message")
end
end
The Gnat.ConnectionSupervisor
is a process that monitors your NATS connection. If connection
is lost, this process will retry according to its backoff settings to re-establish a connection.
gnat_supervisor_settings = %{
name: :gnat,
backoff_period: 4_000,
connection_settings: [
%{host: uri.host, port: uri.port}
]
}
{:ok, _conn} = Gnat.ConnectionSupervisor.start_link(gnat_supervisor_settings)
The connection supervisor’s start_link
establishes a connection asynchronously, so we need to
delay here until the connection is running. This isn’t normally a problem when putting connection
supervisors into a supervision tree at startup
if Process.whereis(:gnat) == nil do
Process.sleep(300)
end
Consumer supervisors work in tandem with connection supervisors. The connection_name
setting
refers to the name of a supervised connection, and not the Gnat
application.
consumer_supervisor_settings = %{
connection_name: :gnat,
This is the module name of a module that exhibits the Gnat.Server
behavior
module: DemoServer,
We can subscribe on multiple topics, each of which can have wildcards
subscription_topics: [
%{topic: "rpc.demo", queue_group: "demo"},
],
}
In most applications the connection and consumer supervisors are started as part of the
supervision tree, but for this sample we just create it manually via start_link
.
{:ok, _sup} = Gnat.ConsumerSupervisor.start_link(consumer_supervisor_settings)
IO.puts("Started consumer supervisor")
This publishes on the topic on which our consumer supervisor is listening.
Gnat.pub(:gnat, "rpc.demo", "hello")
Output
* creating /root/.mix/archives/hex-2.0.6 Resolving Hex dependencies... Resolution completed in 0.042s New: cowlib 2.12.1 ed25519 1.4.1 gnat 1.7.1 jason 1.4.1 nimble_parsec 1.3.1 nkeys 0.2.2 telemetry 1.2.1 * Getting gnat (Hex package) * Getting jason (Hex package) * Getting cowlib (Hex package) * creating /root/.mix/elixir/1-15/rebar3 * Getting nimble_parsec (Hex package) * Getting nkeys (Hex package) * Getting telemetry (Hex package) * Getting ed25519 (Hex package) You have added/upgraded packages you could sponsor, run `mix hex.sponsor` to learn more ==> ed25519 Compiling 2 files (.ex) Generated ed25519 app ==> nkeys Compiling 2 files (.ex) Generated nkeys app ==> nimble_parsec Compiling 4 files (.ex) Generated nimble_parsec app ===> Analyzing applications... ===> Compiling telemetry ==> jason Compiling 10 files (.ex) Generated jason app ===> Analyzing applications... ===> Compiling cowlib ==> gnat Compiling 11 files (.ex) Generated gnat app Manual subscription received: 'NATS by example is a great learning resource!' 10:49:27.682 [debug] connecting to %{port: 4222, host: "nats"} Started consumer supervisor Received message on 'rpc.demo': 'hello'
Set up the dependencies for this script. Ordinarily you would have this set of dependencies declared in your
mix.exs
file.