NATS Logo by Example

Work-queue Stream in JetStream

A work-queue retention policy satisfies a very common use case of queuing up messages that are intended to be processed once and only once.

This retention policy supports queuing up messages from publishers independent of consummption. Since each message is intended to be processed only once, this retention type allows for a set of consumers that have non-overlapping interest on subjects. In other words, if multiple consumers are bound to a work-queue stream, they must have disjoint filter subjects. This is in constrast to a standard limits-based or interest-based stream which supports multiple consumers with overlapping interest.

Like the interest policy this retention policy is additive to any limits set on the stream. As a contrived example, if max-msgs is set to one with old messages being discarded, every new message that is received by the stream will result in the prior message being deleted regardless if any subscriptions were available to process the message.

In this example, we will walk through the work-queue retention setup and behavior. If you are new to streams, it is recommended to read the limits-based stream example prior to reading this one.

CLI Go Python JavaScript Rust C# .NET V2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/workqueue-stream/dotnet2
View the source code or learn how to run this example yourself

Code

Install NuGet packages NATS.Net and Microsoft.Extensions.Logging.Console.

using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;


using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("NATS-by-Example");

NATS_URL environment variable can be used to pass the locations of the NATS servers.

var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222";

Connect to NATS server. Since connection is disposable at the end of our scope we should flush our buffers and close connection cleanly.

var opts = new NatsOpts
{
    Url = url,
    LoggerFactory = loggerFactory,
    Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);

Access JetStream for managing streams and consumers as well as for publishing and consuming messages to and from the stream.

var js = new NatsJSContext(nats);


var streamName = "EVENTS";

Creating the stream

Define the stream configuration, specifying WorkQueuePolicy for retention, and create the stream.

var stream = await js.CreateStreamAsync(new StreamConfig(streamName, new[] { "events.>" })
{
    Retention = StreamConfigRetention.Workqueue,
});

Queue messages

Publish a few messages.

await js.PublishAsync("events.us.page_loaded", "event-data");
await js.PublishAsync("events.us.mouse_clicked", "event-data");
await js.PublishAsync("events.us.input_focused", "event-data");
logger.LogInformation("published 3 messages");

Checking the stream info, we see three messages have been queued.

logger.LogInformation("# Stream info without any consumers");
await PrintStreamStateAsync(stream);

Adding a consumer

Now let’s add a consumer and publish a few more messages. pull

var consumer = await stream.CreateConsumerAsync(new ConsumerConfig("processor-1"));

Fetch and ack the queued messages.

await foreach (var msg in consumer.FetchAsync<string>(opts: new NatsJSFetchOpts { MaxMsgs = 3 }))
{
    await msg.AckAsync();
    /* await msg.AckAsync(new AckOpts { DoubleAck = true }); */
}

Checking the stream info again, we will notice no messages are available.

logger.LogInformation("# Stream info with one consumer");
await PrintStreamStateAsync(stream);

Exclusive non-filtered consumer

As noted in the description above, work-queue streams can only have at most one consumer with interest on a subject at any given time. Since the pull consumer above is not filtered, if we try to create another one, it will fail.

logger.LogInformation("# Create an overlapping consumer");
try
{
    await stream.CreateConsumerAsync(new ConsumerConfig("processor-2"));
}
catch (NatsJSApiException e)
{
    logger.LogInformation("Error: {Message}", e.Error);
}

However if we delete the first one, we can then add the new one.

await stream.DeleteConsumerAsync("processor-1");
await stream.CreateConsumerAsync(new ConsumerConfig("processor-2"));
logger.LogInformation("Created the new consumer");


await stream.DeleteConsumerAsync("processor-2");

Multiple filtered consumers

To create multiple consumers, a subject filter needs to be applied. For this example, we could scope each consumer to the geo that the event was published from, in this case us or eu.

logger.LogInformation("# Create non-overlapping consumers");


var consumer1 = await stream.CreateConsumerAsync(new ConsumerConfig("processor-us") { FilterSubject = "events.us.>" });
var consumer2 = await stream.CreateConsumerAsync(new ConsumerConfig("processor-eu") { FilterSubject = "events.eu.>" });


await js.PublishAsync("events.eu.mouse_clicked", "event-data");
await js.PublishAsync("events.us.page_loaded", "event-data");
await js.PublishAsync("events.us.input_focused", "event-data");
await js.PublishAsync("events.eu.page_loaded", "event-data");
logger.LogInformation("Published 4 messages");


await foreach (var msg in consumer1.FetchAsync<string>(opts: new NatsJSFetchOpts { MaxMsgs = 2 }))
{
    logger.LogInformation("us sub got: {Subject}", msg.Subject);
    await msg.AckAsync();
}


await foreach (var msg in consumer2.FetchAsync<string>(opts: new NatsJSFetchOpts { MaxMsgs = 2 }))
{
    logger.LogInformation("eu sub got: {Subject}", msg.Subject);
    await msg.AckAsync();
}

That’s it!

logger.LogInformation("Bye!");


async Task PrintStreamStateAsync(INatsJSStream jsStream)
{
    await jsStream.RefreshAsync();
    var state = jsStream.Info.State;
    logger.LogInformation(
        "Stream has messages:{Messages} first:{FirstSeq} last:{LastSeq} consumer_count:{ConsumerCount} num_subjects:{NumSubjects}",
        state.Messages,
        state.FirstSeq,
        state.LastSeq,
        state.ConsumerCount,
        state.NumSubjects);
}

Output

info: NATS.Client.Core.NatsConnection[1001]
      Try to connect NATS nats://nats:4222
info: NATS.Client.Core.Internal.NatsReadProtocolProcessor[1005]
      Received server info: ServerInfo { Id = NBRUEGKDSZN4IWXND263LPPDW32QPCB56M7GVBSXKOO42CLAWJAYFXNW, Name = NBRUEGKDSZN4IWXND263LPPDW32QPCB56M7GVBSXKOO42CLAWJAYFXNW, Version = 2.10.4, ProtocolVersion = 1, GitCommit = abc47f7, GoVersion = go1.21.3, Host = 0.0.0.0, Port = 4222, HeadersSupported = True, AuthRequired = False, TlsRequired = False, TlsVerify = False, TlsAvailable = False, MaxPayload = 1048576, JetStreamAvailable = True, ClientId = 5, ClientIp = 192.168.160.3, Nonce = , Cluster = , ClusterDynamic = False, ClientConnectUrls = , WebSocketConnectUrls = , LameDuckMode = False }
info: NATS.Client.Core.NatsConnection[1001]
      Connect succeed NATS-by-Example, NATS nats://nats:4222
info: NATS-by-Example[0]
      published 3 messages
info: NATS-by-Example[0]
      # Stream info without any consumers
info: NATS-by-Example[0]
      Stream has messages:3 first:1 last:3 consumer_count:0 num_subjects:3
info: NATS-by-Example[0]
      # Stream info with one consumer
info: NATS-by-Example[0]
      Stream has messages:0 first:4 last:3 consumer_count:1 num_subjects:0
info: NATS-by-Example[0]
      # Create an overlapping consumer
info: NATS-by-Example[0]
      Error: ApiError { Code = 400, Description = multiple non-filtered consumers not allowed on workqueue stream, ErrCode = 10099 }
info: NATS-by-Example[0]
      Created the new consumer
info: NATS-by-Example[0]
      # Create non-overlapping consumers
info: NATS-by-Example[0]
      Published 4 messages
info: NATS-by-Example[0]
      us sub got: events.us.page_loaded
info: NATS-by-Example[0]
      us sub got: events.us.input_focused
info: NATS-by-Example[0]
      eu sub got: events.eu.mouse_clicked
info: NATS-by-Example[0]
      eu sub got: events.eu.page_loaded
info: NATS-by-Example[0]
      Bye!
info: NATS.Client.Core.NatsConnection[1001]
      Disposing connection NATS-by-Example

Recording

Note, playback is half speed to make it a bit easier to follow.