NATS Logo by Example

Core Publish-Subscribe in Messaging

This example demonstrates the core NATS publish-subscribe behavior. This is the fundamental pattern that all other NATS patterns and higher-level APIs build upon. There are a few takeaways from this example:

  • Delivery is an at-most-once. For MQTT users, this is referred to as Quality of Service (QoS) 0.
  • There are two circumstances when a published message won’t be delivered to a subscriber:
    • The subscriber does not have an active connection to the server (i.e. the client is temporarily offline for some reason)
    • There is a network interruption where the message is ultimately dropped
  • Messages are published to subjects which can be one or more concrete tokens, e.g. greet.bob. Subscribers can utilize wildcards to show interest on a set of matching subjects.
CLI Go Python JavaScript Rust C# .NET V2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run messaging/pub-sub/rust
View the source code or learn how to run this example yourself

Code

use futures::StreamExt;
use std::{env, str::from_utf8};


#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {

Use the NATS_URL env variable if defined, otherwise fallback to the default.

    let nats_url = env::var("NATS_URL")
        .unwrap_or_else(|_| "nats://localhost:4222".to_string());


    let client = async_nats::connect(nats_url).await?;

Publish a message to the subject greet.joe.

    client.publish("greet.joe", "hello".into()).await?;

Subscriber implements Rust iterator, so we can leverage combinators like take() to limit the messages intended to be consumed for this interaction.

    let mut subscription =
        client.subscribe("greet.*").await?.take(3);

Publish to three different subjects matching the wildcard.

    for subject in ["greet.sue", "greet.bob", "greet.pam"] {
        client.publish(subject, "hello".into()).await?;
    }

Notice that the first message received is greet.sue and not greet.joe which was the first message published. This is because core NATS provides at-most-once quality of service (QoS). Subscribers must be connected showing interest in a subject for the server to relay the message to the client.

    while let Some(message) = subscription.next().await {
        println!(
            "{:?} received on {:?}",
            from_utf8(&message.payload),
            &message.subject
        );
    }


    Ok(())
}

Output

Ok("hello") received on "greet.sue"
Ok("hello") received on "greet.bob"
Ok("hello") received on "greet.pam"

Recording

Note, playback is half speed to make it a bit easier to follow.