NATS Logo by Example

Concurrent Message Processing in Messaging

By default, when a subscription is created, each message that is received it process sequentially. There can be multiple subscriptions setup in a [queue group][queue] in which case the NATS server will distribute messages to each member of the group.

However, even within a subscription, it may be desirable to handle messages concurrently. This example shows how this can be achieved in the clients that support it.

CLI Go Python JavaScript Rust C# .NET V2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run messaging/concurrent/dotnet2
View the source code or learn how to run this example yourself

Code

Install NuGet packages NATS.Net and Microsoft.Extensions.Logging.Console.

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;


using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("NATS-by-Example");

NATS_URL environment variable can be used to pass the locations of the NATS servers.

var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222";

Connect to NATS server. Since connection is disposable at the end of our scope we should flush our buffers and close connection cleanly.

var opts = new NatsOpts
{
    Url = url,
    LoggerFactory = loggerFactory,
    Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);


using var cts = new CancellationTokenSource();

Subscribe to a subject and start waiting for messages in the background and start processing messages in parallel.

var subscription = Task.Run(async () =>
{
    await Parallel.ForEachAsync(nats.SubscribeAsync<string>("greet", cancellationToken: cts.Token), async (msg, _) =>
    {
        Console.WriteLine($"Received {msg.Data}");
    });
});

Give some time for the subscription to start.

await Task.Delay(TimeSpan.FromSeconds(1));


for (int i = 0; i < 50; i++)
{
    await nats.PublishAsync("greet", $"hello {i}");
}

Give some time for the subscription to receive all the messages.

await Task.Delay(TimeSpan.FromSeconds(1));


await cts.CancelAsync();


await subscription;

That’s it!

logger.LogInformation("Bye!");

Output

info: NATS.Client.Core.NatsConnection[1001]
      Try to connect NATS nats://nats:4222
info: NATS.Client.Core.Internal.NatsReadProtocolProcessor[1005]
      Received server info: ServerInfo { Id = NBV3DDRVEHSPKTV7ZQV6PBYT6FAPORASTCDJLA4CCSC5XKKYEUVPJUI4, Name = NBV3DDRVEHSPKTV7ZQV6PBYT6FAPORASTCDJLA4CCSC5XKKYEUVPJUI4, Version = 2.10.4, ProtocolVersion = 1, GitCommit = abc47f7, GoVersion = go1.21.3, Host = 0.0.0.0, Port = 4222, HeadersSupported = True, AuthRequired = False, TlsRequired = False, TlsVerify = False, TlsAvailable = False, MaxPayload = 1048576, JetStreamAvailable = True, ClientId = 5, ClientIp = 172.22.0.3, Nonce = , Cluster = , ClusterDynamic = False, ClientConnectUrls = , WebSocketConnectUrls = , LameDuckMode = False }
info: NATS.Client.Core.NatsConnection[1001]
      Connect succeed NATS-by-Example, NATS nats://nats:4222
Received hello 2
Received hello 3
Received hello 0
Received hello 1
Received hello 4
Received hello 8
Received hello 9
Received hello 10
Received hello 11
Received hello 12
Received hello 13
Received hello 14
Received hello 15
Received hello 6
Received hello 17
Received hello 18
Received hello 19
Received hello 20
Received hello 21
Received hello 22
Received hello 23
Received hello 24
Received hello 25
Received hello 26
Received hello 27
Received hello 28
Received hello 29
Received hello 30
Received hello 16
Received hello 31
Received hello 32
Received hello 33
Received hello 35
Received hello 36
Received hello 37
Received hello 38
Received hello 39
Received hello 40
Received hello 41
Received hello 42
Received hello 43
Received hello 44
Received hello 45
Received hello 46
Received hello 47
Received hello 48
Received hello 49
Received hello 7
Received hello 5
Received hello 34
info: NATS-by-Example[0]
      Bye!
info: NATS.Client.Core.NatsConnection[1001]
      Disposing connection NATS-by-Example

Recording

Note, playback is half speed to make it a bit easier to follow.