NATS Logo by Example

Migration to new JetStream API in JetStream

The new JetStream API provides simplified semantics for JetStream asset management and message consumption. It removes the complexity of Subscribe() in favor of more explicit separation of creating consumers and consuming messages.

Additionally, the new API focuses on using Pull Consumers as the primary means of consuming messages from a stream. While the legacy API only supported pull consumers in limited capacity (it was not possible to retrieve messages from a stream in a continuous fashion), the new API provides a more robust set of features to allow for more flexible and performant message consumption.

With the introduction of Consume, Fetch and Next methods, users have the freedom to choose how they want to consume messages, depending on their use case.

This example demonstrates how to migrate from the legacy API to the new API.

CLI Go Python JavaScript Rust C# .NET V2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/api-migration/deno
View the source code or learn how to run this example yourself

Code

Import the library, note that if you are running in Node:

import {
  AckPolicy,
  connect,
  consumerOpts
} from "nats";
import {
  AckPolicy,
  connect,
  consumerOpts,
} from "https://deno.land/x/nats@v1.16.0/src/mod.ts";

Get the passed NATS_URL or fallback to the default. This can be a comma-separated string. If not defined, it will default to localhost:4222 in node, you can access the environment:

const servers = process.env.NATS_URL?.split(",");
const servers = Deno.env.get("NATS_URL")?.split(",");

Create a client connection to an available NATS server.

const nc = await connect({ servers });

Resource creation has not changed. To create a stream and consumers, create a JetStream Manager context - this context has API you can use to create those resources

const jsm = await nc.jetstreamManager();
await jsm.streams.add({
  name: "EVENTS",
  subjects: ["events.>"],
});

To add messages to the stream, create a JetStream context, and publish data to the stream

const js = nc.jetstream();
const proms = Array.from({ length: 20 }).map((_v, idx) => {
  return js.publish(`events.${idx}`);
});
await Promise.all(proms);

Processing Messages

Now lets compare and contrast the new and legacy ways of processing messages.

Legacy Push Subscribe

Previously, the easiest way to continuously receive messages was to use push consumer. The subscribe() API call was intended for these push consumers. These looked very natural to NATS users.

the legacy subscribe() variants relied on consumer options being provided. These options defined the consumer to use. if the consumer didn’t exist, it would be created, if it did, and the options were different, the consumer would be updated

let opts = consumerOpts()
  .deliverTo("eventprocessing")
  .ackExplicit()
  .manualAck();

The subscribe call automatically creates a consumer that matches the specified options, and returns an async iterator with the messages from the stream. If no messages are available, the loop will wait.

You can check if the stream currently has more messages by checking the number of pending messages, and break if you are done - typically your code will simply wait until new messages become available.

const pushSub = await js.subscribe("events.>", opts);
for await (const m of pushSub) {
  console.log(`legacy push subscriber received ${m.subject}`);
  m.ack();
  if (m.info.pending === 0) {
    break;
  }
}

destroy() deletes the consumer! - this is not really necessary on ephemeral consumers, since the server will destroy them after some specified inactivity. If you know the consumer is not going to be needed, then destroying it will help with resource management. Durable consumers are not deleted, will consume resources forever if not managed.

await pushSub.destroy();

Legacy Pull Subscription

The above is quite easy - however for streams that contain huge number of messages, it required that you set up other options, and if you didn’t you could run into issues such as slow consumers, or have a consumer that cannot be horizontally scaled.

To prevent those issues, the legacy API also provided a pullSubscribe(), which effectively avoided the issues of push by enabling the client to request the number of messages it wanted to process:

opts = consumerOpts().ackExplicit().manualAck();

pullSubscribe() would create a subscription to process messages received from the stream, but required a pull() to trigger a request on the server to yield messages

const pullSub = await js.pullSubscribe("events.>", opts);
const done = (async () => {
  for await (const m of pullSub) {
    console.log(`legacy pull subscriber received ${m.subject}`);
    m.ack();
    if (m.info.pending === 0) {
      return;
    }
  }
})();

To get messages flowing, you called pull() on the subscription to start.

pullSub.pull({ batch: 15, no_wait: true });

and also do so at some interval to keep messages flowing. Unfortunately, there no coordination between the processing of the messages and the triggering of the pulls was provided.

const timer = setInterval(() => {
  pullSub.pull({ batch: 15, no_wait: true });
}, 1000);


await done;
clearInterval(timer);

New JetStream Processing API

The new API doesn’t automatically create or update consumers. This is something that the JetStreamManager API does rather well. Instead, you simply use JetStreamManager to create your consumer:

await jsm.consumers.add("EVENTS", {
  name: "my-ephemeral",
  ack_policy: AckPolicy.Explicit,
});

To process messages, you retrieve the consumer by specifying the stream name and consumer names. If the consumer doesn’t exist this call will reject. Note that only pull consumers are supported. If your existing consumer is a push consumer, you will have to recreate it as a pull consumer (not specifying a deliver_subject option on a consumer configuration, nor specifying deliverTo() as an option):

const consumerA = await js.consumers.get("EVENTS", "my-ephemeral");

With a consumer in hand, you can now retrieve messages - in different ways. The different ways of getting messages from the consumers are there to help you align the buffering requirements of your application with what the client is doing.

Consuming Messages

Firstly, we’ll discuss consume, this is analogous to the push consumer example above, where the consumer will yield messages from the stream to match any buffering options specified on the call. The defaults are safe, however you can ask for as many messages as you will be able to process within your ack window. As you consume messages, the library will retrieve more messages for you. Yes, under the hood this is actually a pull consumer, but that actually works smartly for you.

const messages = await consumerA.consume({ max_messages: 5000 });
for await (const m of messages) {
  console.log(`consume received ${m.subject}`);
  m.ack();
  if (m.info.pending === 0) {
    break;
  }
}

If you wanted to preempt delete the consumer you can - however this is something you should do only if you know you are not going to need that consumer to resume processing.

await consumerA.delete();

Let’s create a new consumer, this time a durable

await jsm.consumers.add("EVENTS", {
  durable_name: "my-durable",
  ack_policy: AckPolicy.Explicit,
});

Processing Single Messages

Some clients such as services typically to worry about processing a single message at a time. The idea being, instead of optimizing a client to pull many messages for processing, you can horizontally scale the number of process that work on just one message.

Legacy Pull

The legacy API provided pull() as a way of retrieving a single message:

const m = await js.pull("EVENTS", "my-durable").catch((err) => {
  console.log(err.message);
  return null;
});


if (m === null) {
  console.log("legacy pull got no messages");
} else {
  console.log(`jetstream legacy pull: ${m.subject}`);
  m.ack();
}

Get

With the new JetStream API we can do the same, but it is now called get(). The API is more ergonomic, if no messages it will be null.

const consumerB = await js.consumers.get("EVENTS", "my-durable");
consumerB
  .next()
  .then((m) => {
    if (m === null) {
      console.log("consumer next - no messages available");
    } else {
      console.log(`consumer next - ${m.subject}`);
      m.ack();
    }
  })
  .catch((err) => {
    console.error(err.message);
  });

Processing a Small Batch of Messages

Finally some clients will want to manage the rate at which they receive messages more explicitly.

Legacy JetStream provided the fetch() API which returned one or more messages in a single request:

let iter = await js.fetch("EVENTS", "my-durable", { batch: 3, expires: 5000 });
for await (const m of iter) {
  console.log(`legacy fetch: ${m.subject}`);
  m.ack();
}

The new API also provides the same facilities - notice we already retrieved the consumer as consumer. The batch property, is now called max_messages:

iter = await consumerB.fetch({ max_messages: 3, expires: 5000 });
for await (const m of iter) {
  console.log(`consumer fetch: ${m.subject}`);
  m.ack();
}


await nc.drain();

Output

legacy push subscriber received events.0
legacy push subscriber received events.1
legacy push subscriber received events.2
legacy push subscriber received events.3
legacy push subscriber received events.4
legacy push subscriber received events.5
legacy push subscriber received events.6
legacy push subscriber received events.7
legacy push subscriber received events.8
legacy push subscriber received events.9
legacy push subscriber received events.10
legacy push subscriber received events.11
legacy push subscriber received events.12
legacy push subscriber received events.13
legacy push subscriber received events.14
legacy push subscriber received events.15
legacy push subscriber received events.16
legacy push subscriber received events.17
legacy push subscriber received events.18
legacy push subscriber received events.19
legacy pull subscriber received events.0
legacy pull subscriber received events.1
legacy pull subscriber received events.2
legacy pull subscriber received events.3
legacy pull subscriber received events.4
legacy pull subscriber received events.5
legacy pull subscriber received events.6
legacy pull subscriber received events.7
legacy pull subscriber received events.8
legacy pull subscriber received events.9
legacy pull subscriber received events.10
legacy pull subscriber received events.11
legacy pull subscriber received events.12
legacy pull subscriber received events.13
legacy pull subscriber received events.14
legacy pull subscriber received events.15
legacy pull subscriber received events.16
legacy pull subscriber received events.17
legacy pull subscriber received events.18
legacy pull subscriber received events.19
 >> consumers framework is beta functionality 
consume received events.0
consume received events.1
consume received events.2
consume received events.3
consume received events.4
consume received events.5
consume received events.6
consume received events.7
consume received events.8
consume received events.9
consume received events.10
consume received events.11
consume received events.12
consume received events.13
consume received events.14
consume received events.15
consume received events.16
consume received events.17
consume received events.18
consume received events.19
jetstream legacy pull: events.0
legacy fetch: events.2
consumer next - events.1
legacy fetch: events.3
legacy fetch: events.4
consumer fetch: events.5
consumer fetch: events.6
consumer fetch: events.7

Recording

Note, playback is half speed to make it a bit easier to follow.