Interest-based Stream in JetStream
As the name suggests, the interest retention policy for a stream retains messages for as long as there are consumers which have interest in a particular message.
The base case is where there are no consumers for the streams and messages are being appended. What happens to those messages? By definition, they are immediately deleted from the stream since there are no consumers.
An interest-based stream provides a middle ground between an at-most-once quality of service (QoS) that core NATS provides, requiring all subscribers to be connected to receive a message, and a pure limits-based stream. As long as there are consumers for the stream whose subject filter overlaps with a message appended to a stream, the message won’t be deleted until a subscription bound to each consumer has successfully acks the message, terminates it, or the max redelivery has been reached.
Note that this retention policy is additive to any limits set on the stream. As a
contrived example, if max-msgs
is set to one with old messages being discarded,
every new message that is received by the stream will result in the prior message being
deleted regardless if any of the consumer subscriptions were available to process
the message.
In this example, we will walk through the interest-based retention behaviors in code. If you are new to streams, it is recommended to read the limits-based stream example prior to reading this one. Alternatively, if you are in need of a stream behaving as a queue, check out the work-queue stream.
$ nbe run jetstream/interest-stream/dotnet2View the source code or learn how to run this example yourself
Code
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("NATS-by-Example");
NATS_URL
environment variable can be used to pass the locations of the NATS servers.
var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222";
Connect to NATS server. Since connection is disposable at the end of our scope we should flush our buffers and close connection cleanly.
var opts = new NatsOpts
{
Url = url,
LoggerFactory = loggerFactory,
Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);
Create JetStream Context
which provides methods to create
streams and consumers as well as convenience methods for publishing
to streams and consuming messages from the streams.
var js = new NatsJSContext(nats);
Creating the stream
Define the stream configuration, specifying InterestPolicy
for retention, and
create the stream.
var config = new StreamConfig(name: "EVENTS", subjects: new[] { "events.>" })
{
Retention = StreamConfigRetention.Interest,
};
var stream = await js.CreateStreamAsync(config);
To demonstrate the base case behavior of the stream without any consumers, we will publish a few messages to the stream.
await js.PublishAsync<object>(subject: "events.page_loaded", data: null);
await js.PublishAsync<object>(subject: "events.mouse_clicked", data: null);
var ack = await js.PublishAsync<object>(subject: "events.input_focused", data: null);
logger.LogInformation("Published 3 messages");
We confirm that all three messages were published and the last message sequence is 3.
logger.LogInformation("Last message seq: {Seq}", ack.Seq);
Checking out the stream info, notice how zero messages are present in
the stream, but the last_seq
is 3 which matches the last ACKed
publish sequence above. Also notice that the first_seq
is one greater
which behaves as a sentinel value indicating the stream is empty. This
sequence has not been assigned to a message yet, but can be interpreted
as no messages available in this context.
logger.LogInformation("# Stream info without any consumers");
await PrintStreamStateAsync(stream);
Adding a consumer Now let’s add a pull consumer and publish a few
more messages. Also note that we are only creating the consumer and have not yet started consuming the messages. This is only to point out that a it is not required to be actively consuming messages to show interest, but it is the presence of a consumer which the stream cares about to determine retention of messages. pull
var consumer = await stream.CreateConsumerAsync(new ConsumerConfig("processor-1")
{
AckPolicy = ConsumerConfigAckPolicy.Explicit,
});
await js.PublishAsync<object>(subject: "events.page_loaded", data: null);
await js.PublishAsync<object>(subject: "events.mouse_clicked", data: null);
If we inspect the stream info again, we will notice a few differences.
It shows two messages (which we expect) and the first and last sequences
corresponding to the two messages we just published. We also see that
the consumer_count
is now one.
logger.LogInformation("# Stream info with one consumer");
await PrintStreamStateAsync(stream);
await foreach (var msg in consumer.FetchAsync<string>(opts: new NatsJSFetchOpts { MaxMsgs = 2 }))
{
await msg.AckAsync(new AckOpts { DoubleAck = true });
}
What do we expect in the stream? No messages and the first_seq
has been set to
the next sequence number like in the base case.
☝️ As a quick aside on that second ack, We are using AckSync
here for this
example to ensure the stream state has been synced up for this subsequent
retrieval.
logger.LogInformation("# Stream info with one consumer and acked messages");
await PrintStreamStateAsync(stream);
Two or more consumers
Since each consumer represents a separate view over a stream, we would expect that if messages were processed by one consumer, but not the other, the messages would be retained. This is indeed the case.
var consumer2 = await stream.CreateConsumerAsync(new ConsumerConfig("processor-2")
{
AckPolicy = ConsumerConfigAckPolicy.Explicit,
});
await js.PublishAsync<object>(subject: "events.page_loaded", data: null);
await js.PublishAsync<object>(subject: "events.mouse_clicked", data: null);
Here we fetch 2 messages for processor-2
. There are two observations to
make here. First the fetched messages are the latest two messages that
were published just above and not any prior messages since these were
already deleted from the stream. This should be apparent now, but this
reinforces that a late consumer cannot retroactively show interest. The
second point is that the stream info shows that the latest two messages
are still present in the stream. This is also expected since the first
consumer had not yet processed them.
var msgMetas = new List<NatsJSMsgMetadata>();
await foreach (var msg in consumer2.FetchAsync<string>(opts: new NatsJSFetchOpts { MaxMsgs = 2 }))
{
await msg.AckAsync(new AckOpts { DoubleAck = true });
if (msg.Metadata is { } metadata)
{
msgMetas.Add(metadata);
}
}
logger.LogInformation("msg seqs {Seq1} and {Seq2}", msgMetas[0].Sequence.Stream, msgMetas[1].Sequence.Stream);
logger.LogInformation("# Stream info with two consumers, but only one set of acked messages");
await PrintStreamStateAsync(stream);
Fetching and ack’ing from the first consumer subscription will result in the messages being deleted.
await foreach (var msg in consumer.FetchAsync<string>(opts: new NatsJSFetchOpts { MaxMsgs = 2 }))
{
await msg.AckAsync(new AckOpts { DoubleAck = true });
}
logger.LogInformation("# Stream info with two consumers having both acked");
await PrintStreamStateAsync(stream);
A final callout is that interest respects the FilterSubject
on a consumer.
For example, if a consumer defines a filter only for events.mouse_clicked
events
then it won’t be considered interested in events such as events.input_focused
.
await stream.CreateConsumerAsync(new ConsumerConfig("processor-3")
{
AckPolicy = ConsumerConfigAckPolicy.Explicit,
FilterSubject = "events.mouse_clicked",
});
await js.PublishAsync<object>(subject: "events.input_focused", data: null);
Fetch and Terminate
(also works) and ack from the first consumers that do have interest.
await foreach (var msg in consumer.FetchAsync<string>(opts: new NatsJSFetchOpts { MaxMsgs = 1 }))
{
await msg.AckTerminateAsync();
}
await foreach (var msg in consumer2.FetchAsync<string>(opts: new NatsJSFetchOpts { MaxMsgs = 1 }))
{
await msg.AckAsync(new AckOpts { DoubleAck = true });
}
logger.LogInformation("# Stream info with three consumers with interest from two");
await PrintStreamStateAsync(stream);
That’s it!
logger.LogInformation("Bye!");
async Task PrintStreamStateAsync(INatsJSStream jsStream)
{
await jsStream.RefreshAsync();
var state = jsStream.Info.State;
logger.LogInformation(
"Stream has messages:{Messages} first:{FirstSeq} last:{LastSeq} consumer_count:{ConsumerCount} num_subjects:{NumSubjects}",
state.Messages,
state.FirstSeq,
state.LastSeq,
state.ConsumerCount,
state.NumSubjects);
}
Output
info: NATS.Client.Core.NatsConnection[1001] Try to connect NATS nats://nats:4222 info: NATS.Client.Core.Internal.NatsReadProtocolProcessor[1005] Received server info: ServerInfo { Id = NDMT742URUINNER4MQPYJSWSTLJCQ2DYTPPGNBTX4JPLUM3K4D3XVTZL, Name = NDMT742URUINNER4MQPYJSWSTLJCQ2DYTPPGNBTX4JPLUM3K4D3XVTZL, Version = 2.10.4, ProtocolVersion = 1, GitCommit = abc47f7, GoVersion = go1.21.3, Host = 0.0.0.0, Port = 4222, HeadersSupported = True, AuthRequired = False, TlsRequired = False, TlsVerify = False, TlsAvailable = False, MaxPayload = 1048576, JetStreamAvailable = True, ClientId = 5, ClientIp = 192.168.144.3, Nonce = , Cluster = , ClusterDynamic = False, ClientConnectUrls = , WebSocketConnectUrls = , LameDuckMode = False } info: NATS.Client.Core.NatsConnection[1001] Connect succeed NATS-by-Example, NATS nats://nats:4222 info: NATS-by-Example[0] Published 3 messages info: NATS-by-Example[0] Last message seq: 3 info: NATS-by-Example[0] # Stream info without any consumers info: NATS-by-Example[0] Stream has messages:0 first:4 last:3 consumer_count:0 num_subjects:0 info: NATS-by-Example[0] # Stream info with one consumer info: NATS-by-Example[0] Stream has messages:2 first:4 last:5 consumer_count:1 num_subjects:2 info: NATS-by-Example[0] # Stream info with one consumer and acked messages info: NATS-by-Example[0] Stream has messages:0 first:6 last:5 consumer_count:1 num_subjects:0 info: NATS-by-Example[0] msg seqs 6 and 7 info: NATS-by-Example[0] # Stream info with two consumers, but only one set of acked messages info: NATS-by-Example[0] Stream has messages:2 first:6 last:7 consumer_count:2 num_subjects:2 info: NATS-by-Example[0] # Stream info with two consumers having both acked info: NATS-by-Example[0] Stream has messages:0 first:8 last:7 consumer_count:2 num_subjects:0 info: NATS-by-Example[0] # Stream info with three consumers with interest from two info: NATS-by-Example[0] Stream has messages:0 first:9 last:8 consumer_count:3 num_subjects:0 info: NATS-by-Example[0] Bye! info: NATS.Client.Core.NatsConnection[1001] Disposing connection NATS-by-Example
Install NuGet packages
NATS.Net
andMicrosoft.Extensions.Logging.Console
.