Confirmed Message Ack in JetStream
A confirmed message ack means that the client waits for an ack from the server to ensure that the ack was received and processed. The functionality can be found in various clients under the following:
Name | Clients |
---|---|
ack ack | Javascript |
double ack | Rust, C# .NET V2 |
ack sync | Go, Python, Java, C |
Code
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
NATS_URL
environment variable can be used to pass the locations of the NATS servers.
var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222";
Connect to NATS server. Since connection is disposable at the end of our scope we should flush our buffers and close connection cleanly.
var opts = new NatsOpts
{
Url = url,
Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);
Get a JetStream Context
var js = new NatsJSContext(nats);
Stream Setup
var stream = "verifyAckStream";
var subject = "verifyAckSubject";
var consumerName1 = "consumer1";
var consumerName2 = "consumer2";
Remove the stream first so we have a clean starting point.
try
{
await js.DeleteStreamAsync(stream);
}
catch (NatsJSApiException e) when (e is { Error.Code: 404 })
{
}
Create the stream
var streamConfig = new StreamConfig(stream, [subject])
{
Storage = StreamConfigStorage.Memory,
};
await js.CreateStreamAsync(streamConfig);
Publish a couple messages, so we can look at the state
await js.PublishAsync(subject, "A");
await js.PublishAsync(subject, "B");
Consume a message with 2 different consumers The first consumer will Ack without confirmation The second consumer will AckSync which confirms that ack was handled.
Consumer 1, regular ack
Console.WriteLine("Consumer 1");
var consumer1 = await js.CreateOrUpdateConsumerAsync(stream, new ConsumerConfig(consumerName1));
Console.WriteLine(" Start");
Console.WriteLine($" pending messages: {consumer1.Info.NumPending}");
Console.WriteLine($" messages with ack pending: {consumer1.Info.NumAckPending}");
var next = await consumer1.NextAsync<string>();
refresh the consumer to update it’s state
await consumer1.RefreshAsync();
Console.WriteLine(" After received but before ack");
Console.WriteLine($" pending messages: {consumer1.Info.NumPending}");
Console.WriteLine($" messages with ack pending: {consumer1.Info.NumAckPending}");
if (next is { } msg1)
{
await msg1.AckAsync();
}
await consumer1.RefreshAsync();
Console.WriteLine(" After ack");
Console.WriteLine($" pending messages: {consumer1.Info.NumPending}");
Console.WriteLine($" messages with ack pending: {consumer1.Info.NumAckPending}");
Consumer 2 Double Ack
var consumer2 = await js.CreateOrUpdateConsumerAsync(stream, new ConsumerConfig(consumerName2));
Console.WriteLine("Consumer 2");
Console.WriteLine(" Start");
Console.WriteLine($" pending messages: {consumer1.Info.NumPending}");
Console.WriteLine($" messages with ack pending: {consumer1.Info.NumAckPending}");
next = await consumer2.NextAsync<string>();
await consumer2.RefreshAsync();
Console.WriteLine(" After received but before ack");
Console.WriteLine($" pending messages: {consumer2.Info.NumPending}");
Console.WriteLine($" messages with ack pending: {consumer2.Info.NumAckPending}");
if (next is { } msg2)
{
await msg2.AckAsync(new AckOpts { DoubleAck = true });
}
await consumer2.RefreshAsync();
Console.WriteLine(" After ack");
Console.WriteLine($" pending messages: {consumer2.Info.NumPending}");
Console.WriteLine($" messages with ack pending: {consumer2.Info.NumAckPending}");
Output
info: NATS.Client.Core.NatsConnection[1001] Try to connect NATS nats://nats:4222 info: NATS.Client.Core.Internal.NatsReadProtocolProcessor[1005] Received server info: ServerInfo { Id = NA4I74EZCLUSA4FD7XDAPHXW4P2CLLUHRM4LO5WXG4LN3QE46LK2EYWE, Name = NA4I74EZCLUSA4FD7XDAPHXW4P2CLLUHRM4LO5WXG4LN3QE46LK2EYWE, Version = 2.10.4, ProtocolVersion = 1, GitCommit = abc47f7, GoVersion = go1.21.3, Host = 0.0.0.0, Port = 4222, HeadersSupported = True, AuthRequired = False, TlsRequired = False, TlsVerify = False, TlsAvailable = False, MaxPayload = 1048576, JetStreamAvailable = True, ClientId = 5, ClientIp = 172.18.0.3, Nonce = , Cluster = , ClusterDynamic = False, ClientConnectUrls = , WebSocketConnectUrls = , LameDuckMode = False } info: NATS.Client.Core.NatsConnection[1001] Connect succeed NATS-by-Example, NATS nats://nats:4222 info: NATS-by-Example[0] Consumer 1, Start # pending messages: 2, messages with ack pending: 0 info: NATS-by-Example[0] Consumer 1, After received but before ack # pending messages: 1, messages with ack pending: 1 info: NATS-by-Example[0] Consumer 1, After ack # pending messages: 1, messages with ack pending: 0 info: NATS-by-Example[0] Consumer 2, Start # pending messages: 2, messages with ack pending: 0 info: NATS-by-Example[0] Consumer 2, After received but before ack # pending messages: 1, messages with ack pending: 1 info: NATS-by-Example[0] Consumer 2, After ack # pending messages: 1, messages with ack pending: 0 info: NATS-by-Example[0] Bye! info: NATS.Client.Core.NatsConnection[1001] Disposing connection NATS-by-Example
Install NuGet packages
NATS.Net
andMicrosoft.Extensions.Logging.Console
.