Core Publish-Subscribe in Messaging
This example demonstrates the core NATS publish-subscribe behavior. This is the fundamental pattern that all other NATS patterns and higher-level APIs build upon. There are a few takeaways from this example:
- Delivery is an at-most-once. For MQTT users, this is referred to as Quality of Service (QoS) 0.
- There are two circumstances when a published message won’t be delivered to a subscriber:
- The subscriber does not have an active connection to the server (i.e. the client is temporarily offline for some reason)
- There is a network interruption where the message is ultimately dropped
- Messages are published to subjects which can be one or more concrete tokens, e.g.
greet.bob
. Subscribers can utilize wildcards to show interest on a set of matching subjects.
Code
import { connect } from "https://deno.land/x/nats@v1.16.0/src/mod.ts";
Get the passed NATS_URL or fallback to the default. This can be a comma-separated string.
const servers = Deno.env.get("NATS_URL") || "nats://localhost:4222";
Create a client connection to an available NATS server.
const nc = await connect({
servers: servers.split(","),
});
To publish a message, simply provide the subject of the message
and encode the message payload. NATS subjects are hierarchical using
periods as token delimiters. greet
and joe
are two distinct tokens.
nc.publish("greet.bob", "hello");
Now we are going to create a subscription and utilize a wildcard on
the second token. The effect is that this subscription shows interest
in all messages published to a subject with two tokens where the first
is greet
.
let sub = nc.subscribe("greet.*", {max: 3});
const done = (async () => {
for await (const msg of sub) {
console.log(`${msg.string()} on subject ${msg.subject}`);
}
})()
Let’s publish three more messages which will result in the messages being forwarded to the local subscription we have.
nc.publish("greet.joe", "hello");
nc.publish("greet.pam", "hello");
nc.publish("greet.sue", "hello");
This will wait until the above async subscription handler finishes
processing the three messages. Note that the first message to
greet.bob
was not printed. This is because the subscription was
created after the publish. Core NATS provides at-most-once quality
of service (QoS) for active subscriptions.
await done;
Finally we drain the connection which waits for any pending messages (published or in a subscription) to be flushed.
await nc.drain();
Output
hello on subject greet.joe hello on subject greet.pam hello on subject greet.sue
import the library - in node.js
import {connect, etc} from "nats";
or if not doing a module,const {connect, etc} = require("nats");