NATS Logo by Example

Pull Consumers in JetStream

A pull consumer allows for the application to fetch one or more messages on-demand using a subscription bound to the consumer. This allows the application to control the flow of the messages coming in so it can process and ack them in an appropriate amount of time.

A consumer can either be durable or ephemeral. A durable consumer will have its state tracked on the server, most importantly, the last acknowledged message from the client.

Ephemeral consumers are useful as one-off needs and are a bit cheaper in terms of resources and management. However, ephemerals do not (of course) persist after the primary subscriber unsubscribes. The server will automatically clean up (delete) the consumer after a period of time.

Since each subscription is fetching messages on-demand, multiple subscriptions can be create bound to the same pull consumer without any additional configuration. Each subscriber can fetch batches of messages and process them concurrently.

It is important to note that the messages in a given batch are ordered with respect to each other, but each subscriber will be handling a batch independently. If there is a need to have determinstic partitioning for scalable order processing, learn more here.

CLI Go Python JavaScript Rust C# .NET V2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/pull-consumer/dotnet2
View the source code or learn how to run this example yourself

Code

Install NuGet packages NATS.Net and Microsoft.Extensions.Logging.Console.

using System.Diagnostics;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;


using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("NATS-by-Example");

NATS_URL environment variable can be used to pass the locations of the NATS servers.

var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222";

Connect to NATS server. Since connection is disposable at the end of our scope we should flush our buffers and close connection cleanly.

var opts = new NatsOpts
{
    Url = url,
    LoggerFactory = loggerFactory,
    Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);

Access JetStream for managing streams and consumers as well as for publishing and consuming messages to and from the stream.

var js = new NatsJSContext(nats);


var streamName = "EVENTS";

Declare a simple limits-based stream.

var stream = await js.CreateStreamAsync(new StreamConfig(streamName, new[] { "events.>" }));

Publish a few messages for the example.

await js.PublishAsync(subject: "events.1", data: "event-data-1");
await js.PublishAsync(subject: "events.2", data: "event-data-2");
await js.PublishAsync(subject: "events.3", data: "event-data-3");

Create the consumer bound to the previously created stream. If durable name is not supplied, consumer will be removed after InactiveThreshold (defaults to 5 seconds) is reached when not actively consuming messages. Name is optional, if not provided it will be auto-generated. For this example, let’s use the consumer with no options, which will be ephemeral with auto-generated name.

var consumer = await stream.CreateConsumerAsync(new ConsumerConfig());

Messages can be consumed continuously in a loop using Consume method. Consume can be supplied with various options, but for this example we will use the default ones.break is used as part of this example to make sure to stop processing after we process 3 messages (so that it does not interfere with other examples).

var count = 0;
await foreach (var msg in consumer.ConsumeAsync<string>())
{
    await msg.AckAsync();
    logger.LogInformation("received msg on {Subject} with data {Data}", msg.Subject, msg.Data);
    if (++count == 3)
        break;
}

Publish more messages.

await js.PublishAsync(subject: "events.1", data: "event-data-1");
await js.PublishAsync(subject: "events.2", data: "event-data-2");
await js.PublishAsync(subject: "events.3", data: "event-data-3");

We can fetch messages in batches. The first argument being the batch size which is the maximum number of messages that should be returned. For this first fetch, we ask for two and we will get those since they are in the stream.

var fetchCount = 0;
await foreach (var msg in consumer.FetchAsync<string>(opts: new NatsJSFetchOpts { MaxMsgs = 2 }))
{
    await msg.AckAsync();
    fetchCount++;
}


logger.LogInformation("Got {Count} messages", fetchCount);

Fetch puts messages on the returned Messages() channel. This channel will only be closed when the requested number of messages have been received or the operation times out. If we do not want to wait for the rest of the messages and want to quickly return as many messages as there are available (up to provided batch size), we can use FetchNoWait instead. Here, because we have already received two messages, we will only get one more. NOTE: FetchNoWait usage is discouraged since it can cause unnecessary load if not used correctly e.g. in a loop without a backoff it will continuously try to get messages even if there is no new messages in the stream.

fetchCount = 0;
await foreach (var msg in ((NatsJSConsumer)consumer).FetchNoWaitAsync<string>(opts: new NatsJSFetchOpts { MaxMsgs = 100 }))
{
    await msg.AckAsync();
    fetchCount++;
}
logger.LogInformation("Got {Count} messages", fetchCount);

Finally, if we are at the end of the stream and we call fetch, the call will be blocked until the “max wait” time which is 30 seconds by default, but this can be set explicitly as an option.

var fetchStopwatch = Stopwatch.StartNew();
fetchCount = 0;
await foreach (var msg in consumer.FetchAsync<string>(opts: new NatsJSFetchOpts { MaxMsgs = 100, Expires = TimeSpan.FromSeconds(1) }))
{
    await msg.AckAsync();
    fetchCount++;
}
logger.LogInformation("Got {Count} messages in {Elapsed}", fetchCount, fetchStopwatch.Elapsed);

Durable consumers can be created by specifying the Durable name. Durable consumers are not removed automatically regardless of the InactiveThreshold. They can be removed by calling DeleteConsumer.

var durable = await stream.CreateConsumerAsync(new ConsumerConfig("processor"));

Consume and fetch work the same way for durable consumers.

await foreach (var msg in durable.FetchAsync<string>(opts: new NatsJSFetchOpts { MaxMsgs = 1 }))
{
    logger.LogInformation("Received {Subject} from durable consumer", msg.Subject);
}

While ephemeral consumers will be removed after InactiveThreshold, durable consumers have to be removed explicitly if no longer needed.

await stream.DeleteConsumerAsync("processor");

Let’s try to get the consumer to make sure it’s gone.

try
{
    await stream.GetConsumerAsync("processor");
}
catch (NatsJSApiException e)
{
    if (e.Error.Code == 404)
    {
        logger.LogInformation("Consumer is gone");
    }
}

That’s it!

logger.LogInformation("Bye!");

Output

info: NATS.Client.Core.NatsConnection[1001]
      Try to connect NATS nats://nats:4222
info: NATS.Client.Core.Internal.NatsReadProtocolProcessor[1005]
      Received server info: ServerInfo { Id = NCOG3JTGFJ2TBMN7ONAKUXU6L7NSHWOPW2A3NMA265XE22C4QEXWJ545, Name = NCOG3JTGFJ2TBMN7ONAKUXU6L7NSHWOPW2A3NMA265XE22C4QEXWJ545, Version = 2.10.4, ProtocolVersion = 1, GitCommit = abc47f7, GoVersion = go1.21.3, Host = 0.0.0.0, Port = 4222, HeadersSupported = True, AuthRequired = False, TlsRequired = False, TlsVerify = False, TlsAvailable = False, MaxPayload = 1048576, JetStreamAvailable = True, ClientId = 5, ClientIp = 192.168.176.3, Nonce = , Cluster = , ClusterDynamic = False, ClientConnectUrls = , WebSocketConnectUrls = , LameDuckMode = False }
info: NATS.Client.Core.NatsConnection[1001]
      Connect succeed NATS-by-Example, NATS nats://nats:4222
info: NATS-by-Example[0]
      received msg on events.1 with data event-data-1
info: NATS-by-Example[0]
      received msg on events.2 with data event-data-2
info: NATS-by-Example[0]
      received msg on events.3 with data event-data-3
info: NATS-by-Example[0]
      Got 2 messages
info: NATS-by-Example[0]
      Got 1 messages
info: NATS-by-Example[0]
      Got 0 messages in 00:00:01.0016194
info: NATS-by-Example[0]
      Received events.1 from durable consumer
info: NATS-by-Example[0]
      Consumer is gone
info: NATS-by-Example[0]
      Bye!
info: NATS.Client.Core.NatsConnection[1001]
      Disposing connection NATS-by-Example

Recording

Note, playback is half speed to make it a bit easier to follow.