Migration to new JetStream API in JetStream
The new JetStream API provides simplified semantics for JetStream asset
management and message consumption. It removes the complexity of Subscribe()
in favor of more explicit separation of creating consumers and consuming messages.
Additionally, the new API focuses on using Pull Consumers as the primary means of consuming messages from a stream. While the legacy API only supported pull consumers in limited capacity (it was not possible to retrieve messages from a stream in a continuous fashion), the new API provides a more robust set of features to allow for more flexible and performant message consumption.
With the introduction of Consume
, Fetch
and Next
methods, users have
the freedom to choose how they want to consume messages, depending on their
use case.
This example demonstrates how to migrate from the legacy API to the new API.
$ nbe run jetstream/api-migration/denoView the source code or learn how to run this example yourself
Code
import {
AckPolicy,
connect,
consumerOpts,
} from "https://deno.land/x/nats@v1.16.0/src/mod.ts";
Get the passed NATS_URL or fallback to the default. This can be a comma-separated string. If not defined, it will default to localhost:4222 in node, you can access the environment:
const servers = process.env.NATS_URL?.split(",");
const servers = Deno.env.get("NATS_URL")?.split(",");
Create a client connection to an available NATS server.
const nc = await connect({ servers });
Resource creation has not changed. To create a stream and consumers, create a JetStream Manager context - this context has API you can use to create those resources
const jsm = await nc.jetstreamManager();
await jsm.streams.add({
name: "EVENTS",
subjects: ["events.>"],
});
To add messages to the stream, create a JetStream context, and publish data to the stream
const js = nc.jetstream();
const proms = Array.from({ length: 20 }).map((_v, idx) => {
return js.publish(`events.${idx}`);
});
await Promise.all(proms);
Processing Messages
Now lets compare and contrast the new and legacy ways of processing messages.
Legacy Push Subscribe
Previously, the easiest way to continuously receive messages
was to use push consumer. The subscribe()
API call was
intended for these push consumers. These looked very natural
to NATS users.
the legacy subscribe()
variants relied on consumer options
being provided. These options defined the consumer to use.
if the consumer didn’t exist, it would be created, if it did,
and the options were different, the consumer would be updated
let opts = consumerOpts()
.deliverTo("eventprocessing")
.ackExplicit()
.manualAck();
The subscribe
call automatically creates a consumer that matches the specified
options, and returns an async iterator with the messages from the stream.
If no messages are available, the loop will wait.
You can check if the stream currently has more messages by checking the number of pending messages, and break if you are done - typically your code will simply wait until new messages become available.
const pushSub = await js.subscribe("events.>", opts);
for await (const m of pushSub) {
console.log(`legacy push subscriber received ${m.subject}`);
m.ack();
if (m.info.pending === 0) {
break;
}
}
destroy()
deletes the consumer! - this is not really necessary on ephemeral
consumers, since the server will destroy them after some specified inactivity.
If you know the consumer is not going to be needed, then destroying it will
help with resource management. Durable consumers are not deleted, will
consume resources forever if not managed.
await pushSub.destroy();
Legacy Pull Subscription
The above is quite easy - however for streams that contain huge number of messages, it required that you set up other options, and if you didn’t you could run into issues such as slow consumers, or have a consumer that cannot be horizontally scaled.
To prevent those issues, the legacy API also provided a
pullSubscribe()
, which effectively avoided the issues of push
by enabling the client to request the number of messages it wanted
to process:
opts = consumerOpts().ackExplicit().manualAck();
pullSubscribe()
would create a subscription to process messages
received from the stream, but required a pull()
to trigger
a request on the server to yield messages
const pullSub = await js.pullSubscribe("events.>", opts);
const done = (async () => {
for await (const m of pullSub) {
console.log(`legacy pull subscriber received ${m.subject}`);
m.ack();
if (m.info.pending === 0) {
return;
}
}
})();
To get messages flowing, you called pull()
on
the subscription to start.
pullSub.pull({ batch: 15, no_wait: true });
and also do so at some interval to keep messages flowing. Unfortunately, there no coordination between the processing of the messages and the triggering of the pulls was provided.
const timer = setInterval(() => {
pullSub.pull({ batch: 15, no_wait: true });
}, 1000);
await done;
clearInterval(timer);
New JetStream Processing API
The new API doesn’t automatically create or update consumers. This is something that the JetStreamManager API does rather well. Instead, you simply use JetStreamManager to create your consumer:
await jsm.consumers.add("EVENTS", {
name: "my-ephemeral",
ack_policy: AckPolicy.Explicit,
});
To process messages, you retrieve the consumer by specifying
the stream name and consumer names. If the consumer doesn’t exist
this call will reject. Note that only pull consumers are supported.
If your existing consumer is a push consumer, you will have to recreate
it as a pull consumer (not specifying a deliver_subject
option on a
consumer configuration, nor specifying deliverTo()
as an option):
const consumerA = await js.consumers.get("EVENTS", "my-ephemeral");
With a consumer in hand, you can now retrieve messages - in different ways. The different ways of getting messages from the consumers are there to help you align the buffering requirements of your application with what the client is doing.
Consuming Messages
Firstly, we’ll discuss consume, this is analogous to the push consumer example above, where the consumer will yield messages from the stream to match any buffering options specified on the call. The defaults are safe, however you can ask for as many messages as you will be able to process within your ack window. As you consume messages, the library will retrieve more messages for you. Yes, under the hood this is actually a pull consumer, but that actually works smartly for you.
const messages = await consumerA.consume({ max_messages: 5000 });
for await (const m of messages) {
console.log(`consume received ${m.subject}`);
m.ack();
if (m.info.pending === 0) {
break;
}
}
If you wanted to preempt delete the consumer you can - however this is something you should do only if you know you are not going to need that consumer to resume processing.
await consumerA.delete();
Let’s create a new consumer, this time a durable
await jsm.consumers.add("EVENTS", {
durable_name: "my-durable",
ack_policy: AckPolicy.Explicit,
});
Processing Single Messages
Some clients such as services typically to worry about processing a single message at a time. The idea being, instead of optimizing a client to pull many messages for processing, you can horizontally scale the number of process that work on just one message.
Legacy Pull
The legacy API provided pull()
as a way of retrieving a single message:
const m = await js.pull("EVENTS", "my-durable").catch((err) => {
console.log(err.message);
return null;
});
if (m === null) {
console.log("legacy pull got no messages");
} else {
console.log(`jetstream legacy pull: ${m.subject}`);
m.ack();
}
Get
With the new JetStream API we can do the same, but it is now called get()
.
The API is more ergonomic, if no messages it will be null
.
const consumerB = await js.consumers.get("EVENTS", "my-durable");
consumerB
.next()
.then((m) => {
if (m === null) {
console.log("consumer next - no messages available");
} else {
console.log(`consumer next - ${m.subject}`);
m.ack();
}
})
.catch((err) => {
console.error(err.message);
});
Processing a Small Batch of Messages
Finally some clients will want to manage the rate at which they receive messages more explicitly.
Legacy JetStream provided the fetch()
API which
returned one or more messages in a single request:
let iter = await js.fetch("EVENTS", "my-durable", { batch: 3, expires: 5000 });
for await (const m of iter) {
console.log(`legacy fetch: ${m.subject}`);
m.ack();
}
The new API also provides the same facilities - notice we already
retrieved the consumer as consumer
. The batch property, is now called
max_messages
:
iter = await consumerB.fetch({ max_messages: 3, expires: 5000 });
for await (const m of iter) {
console.log(`consumer fetch: ${m.subject}`);
m.ack();
}
await nc.drain();
Output
legacy push subscriber received events.0 legacy push subscriber received events.1 legacy push subscriber received events.2 legacy push subscriber received events.3 legacy push subscriber received events.4 legacy push subscriber received events.5 legacy push subscriber received events.6 legacy push subscriber received events.7 legacy push subscriber received events.8 legacy push subscriber received events.9 legacy push subscriber received events.10 legacy push subscriber received events.11 legacy push subscriber received events.12 legacy push subscriber received events.13 legacy push subscriber received events.14 legacy push subscriber received events.15 legacy push subscriber received events.16 legacy push subscriber received events.17 legacy push subscriber received events.18 legacy push subscriber received events.19 legacy pull subscriber received events.0 legacy pull subscriber received events.1 legacy pull subscriber received events.2 legacy pull subscriber received events.3 legacy pull subscriber received events.4 legacy pull subscriber received events.5 legacy pull subscriber received events.6 legacy pull subscriber received events.7 legacy pull subscriber received events.8 legacy pull subscriber received events.9 legacy pull subscriber received events.10 legacy pull subscriber received events.11 legacy pull subscriber received events.12 legacy pull subscriber received events.13 legacy pull subscriber received events.14 legacy pull subscriber received events.15 legacy pull subscriber received events.16 legacy pull subscriber received events.17 legacy pull subscriber received events.18 legacy pull subscriber received events.19 [33m >> consumers framework is beta functionality [0m consume received events.0 consume received events.1 consume received events.2 consume received events.3 consume received events.4 consume received events.5 consume received events.6 consume received events.7 consume received events.8 consume received events.9 consume received events.10 consume received events.11 consume received events.12 consume received events.13 consume received events.14 consume received events.15 consume received events.16 consume received events.17 consume received events.18 consume received events.19 jetstream legacy pull: events.0 legacy fetch: events.2 consumer next - events.1 legacy fetch: events.3 legacy fetch: events.4 consumer fetch: events.5 consumer fetch: events.6 consumer fetch: events.7
Import the library, note that if you are running in Node: