NATS Logo by Example

Core Publish-Subscribe in Messaging

This example demonstrates the core NATS publish-subscribe behavior. This is the fundamental pattern that all other NATS patterns and higher-level APIs build upon. There are a few takeaways from this example:

  • Delivery is an at-most-once. For MQTT users, this is referred to as Quality of Service (QoS) 0.
  • There are two circumstances when a published message won’t be delivered to a subscriber:
    • The subscriber does not have an active connection to the server (i.e. the client is temporarily offline for some reason)
    • There is a network interruption where the message is ultimately dropped
  • Messages are published to subjects which can be one or more concrete tokens, e.g. greet.bob. Subscribers can utilize wildcards to show interest on a set of matching subjects.
CLI Go Python JavaScript Rust C# .NET V2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run messaging/pub-sub/elixir
View the source code or learn how to run this example yourself

Code

Set up the dependencies for this script. Ordinarily you would have this set of dependencies declared in your mix.exs file.

Mix.install([

For documentation on the Gnat library, see https://hexdocs.pm/gnat/readme.html

  {:gnat, "~> 1.6"},
  {:jason, "~> 1.0"}
])


url = System.get_env("NATS_URL", "nats://127.0.0.1:4222")
uri = URI.parse(url)

Call start_link on Gnat to start the Gnat application supervisor

{:ok, gnat} = Gnat.start_link(%{host: uri.host, port: uri.port})

Manual subscriptions are easy and straightforward, just supply a topic and the target pid to receive the {:msg, m} messages from the subscription.

{:ok, subscription} = Gnat.sub(gnat, self(), "nbe.*")

Here we send a message to a subject that has a subscriber

:ok = Gnat.pub(gnat, "nbe.news", "NATS by example is a great learning resource!")

In elixir, an explicit receive call blocks the current process until a message arrives in its inbox. In this case, we’re waiting for the {:msg, m} tuple from the NATS client.

receive do
  {:msg, %{body: body, topic: "nbe.news", reply_to: nil}} ->
    IO.puts("Manual subscription received: '#{body}'")
end

Now let’s move on to more resilient and production-grade ways of subscribing

In addition to one-off subscriptions, you can create a resilient consumer supervisor that you intend to keep running for a long period of time that will survive network partition events. This consumer supervisor can invoke callbacks in a module that conforms to the Gnat.Server behavior, like this DemoServer module.

defmodule DemoServer do
  use Gnat.Server


  def request(%{body: body, topic: topic}) do
    IO.puts("Received message on '#{topic}': '#{body}'")
    :ok
  end

The error handler is an optional callback. Gnat.Server has a default one that you can use.

  def error(%{gnat: gnat, reply_to: reply_to}, _error) do
    Gnat.pub(gnat, reply_to, "Something went wrong and I can't handle your message")
  end
end

The Gnat.ConnectionSupervisor is a process that monitors your NATS connection. If connection is lost, this process will retry according to its backoff settings to re-establish a connection.

gnat_supervisor_settings = %{
  name: :gnat,
  backoff_period: 4_000,
  connection_settings: [
    %{host: uri.host, port: uri.port}
  ]
}
{:ok, _conn} = Gnat.ConnectionSupervisor.start_link(gnat_supervisor_settings)

The connection supervisor’s start_link establishes a connection asynchronously, so we need to delay here until the connection is running. This isn’t normally a problem when putting connection supervisors into a supervision tree at startup

if Process.whereis(:gnat) == nil do
  Process.sleep(300)
end

Consumer supervisors work in tandem with connection supervisors. The connection_name setting refers to the name of a supervised connection, and not the Gnat application.

consumer_supervisor_settings = %{
  connection_name: :gnat,

This is the module name of a module that exhibits the Gnat.Server behavior

  module: DemoServer,

We can subscribe on multiple topics, each of which can have wildcards

  subscription_topics: [
    %{topic: "rpc.demo", queue_group: "demo"},
  ],
}

In most applications the connection and consumer supervisors are started as part of the supervision tree, but for this sample we just create it manually via start_link.

{:ok, _sup} = Gnat.ConsumerSupervisor.start_link(consumer_supervisor_settings)
IO.puts("Started consumer supervisor")

This publishes on the topic on which our consumer supervisor is listening.

Gnat.pub(:gnat, "rpc.demo", "hello")

Output

* creating /root/.mix/archives/hex-2.0.6
Resolving Hex dependencies...
Resolution completed in 0.042s
New:
  cowlib 2.12.1
  ed25519 1.4.1
  gnat 1.7.1
  jason 1.4.1
  nimble_parsec 1.3.1
  nkeys 0.2.2
  telemetry 1.2.1
* Getting gnat (Hex package)
* Getting jason (Hex package)
* Getting cowlib (Hex package)
* creating /root/.mix/elixir/1-15/rebar3
* Getting nimble_parsec (Hex package)
* Getting nkeys (Hex package)
* Getting telemetry (Hex package)
* Getting ed25519 (Hex package)
You have added/upgraded packages you could sponsor, run `mix hex.sponsor` to learn more
==> ed25519
Compiling 2 files (.ex)
Generated ed25519 app
==> nkeys
Compiling 2 files (.ex)
Generated nkeys app
==> nimble_parsec
Compiling 4 files (.ex)
Generated nimble_parsec app
===> Analyzing applications...
===> Compiling telemetry
==> jason
Compiling 10 files (.ex)
Generated jason app
===> Analyzing applications...
===> Compiling cowlib
==> gnat
Compiling 11 files (.ex)
Generated gnat app
Manual subscription received: 'NATS by example is a great learning resource!'

10:49:27.682 [debug] connecting to %{port: 4222, host: "nats"}
Started consumer supervisor
Received message on 'rpc.demo': 'hello'

Recording

Note, playback is half speed to make it a bit easier to follow.