Concurrent Message Processing in Messaging
By default, when a subscription is created, each message that is received it process sequentially. There can be multiple subscriptions setup in a [queue group][queue] in which case the NATS server will distribute messages to each member of the group.
However, even within a subscription, it may be desirable to handle messages concurrently. This example shows how this can be achieved in the clients that support it.
$ nbe run messaging/concurrent/dotnet2View the source code or learn how to run this example yourself
Code
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("NATS-by-Example");
NATS_URL
environment variable can be used to pass the locations of the NATS servers.
var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222";
Connect to NATS server. Since connection is disposable at the end of our scope we should flush our buffers and close connection cleanly.
var opts = new NatsOpts
{
Url = url,
LoggerFactory = loggerFactory,
Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);
using var cts = new CancellationTokenSource();
Subscribe to a subject and start waiting for messages in the background and start processing messages in parallel.
var subscription = Task.Run(async () =>
{
await Parallel.ForEachAsync(nats.SubscribeAsync<string>("greet", cancellationToken: cts.Token), async (msg, _) =>
{
Console.WriteLine($"Received {msg.Data}");
});
});
Give some time for the subscription to start.
await Task.Delay(TimeSpan.FromSeconds(1));
for (int i = 0; i < 50; i++)
{
await nats.PublishAsync("greet", $"hello {i}");
}
Give some time for the subscription to receive all the messages.
await Task.Delay(TimeSpan.FromSeconds(1));
await cts.CancelAsync();
await subscription;
That’s it!
logger.LogInformation("Bye!");
Output
info: NATS.Client.Core.NatsConnection[1001] Try to connect NATS nats://nats:4222 info: NATS.Client.Core.Internal.NatsReadProtocolProcessor[1005] Received server info: ServerInfo { Id = NBV3DDRVEHSPKTV7ZQV6PBYT6FAPORASTCDJLA4CCSC5XKKYEUVPJUI4, Name = NBV3DDRVEHSPKTV7ZQV6PBYT6FAPORASTCDJLA4CCSC5XKKYEUVPJUI4, Version = 2.10.4, ProtocolVersion = 1, GitCommit = abc47f7, GoVersion = go1.21.3, Host = 0.0.0.0, Port = 4222, HeadersSupported = True, AuthRequired = False, TlsRequired = False, TlsVerify = False, TlsAvailable = False, MaxPayload = 1048576, JetStreamAvailable = True, ClientId = 5, ClientIp = 172.22.0.3, Nonce = , Cluster = , ClusterDynamic = False, ClientConnectUrls = , WebSocketConnectUrls = , LameDuckMode = False } info: NATS.Client.Core.NatsConnection[1001] Connect succeed NATS-by-Example, NATS nats://nats:4222 Received hello 2 Received hello 3 Received hello 0 Received hello 1 Received hello 4 Received hello 8 Received hello 9 Received hello 10 Received hello 11 Received hello 12 Received hello 13 Received hello 14 Received hello 15 Received hello 6 Received hello 17 Received hello 18 Received hello 19 Received hello 20 Received hello 21 Received hello 22 Received hello 23 Received hello 24 Received hello 25 Received hello 26 Received hello 27 Received hello 28 Received hello 29 Received hello 30 Received hello 16 Received hello 31 Received hello 32 Received hello 33 Received hello 35 Received hello 36 Received hello 37 Received hello 38 Received hello 39 Received hello 40 Received hello 41 Received hello 42 Received hello 43 Received hello 44 Received hello 45 Received hello 46 Received hello 47 Received hello 48 Received hello 49 Received hello 7 Received hello 5 Received hello 34 info: NATS-by-Example[0] Bye! info: NATS.Client.Core.NatsConnection[1001] Disposing connection NATS-by-Example
Install NuGet packages
NATS.Net
andMicrosoft.Extensions.Logging.Console
.