NATS Logo by Example

Iterating Over Multiple Subscriptions in Messaging

Core NATS subscription support flexible subject model with tokens and wildcards, but there are cases that require setting up separate subscriptions (i.e. user want transport.cars, transport.planes and transport.ships, but not transport.spaceships).

Such approach works and is performant, but not very convenient when reading the messages. This example shows how to achieve required behaviour without sacrificing usability.

CLI Go Python JavaScript Rust C# .NET V2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run messaging/iterating-multiple-subscriptions/dotnet2
View the source code or learn how to run this example yourself

Code

Install NuGet packages NATS.Net, System.Interactive.Async and Microsoft.Extensions.Logging.Console.

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;


using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("NATS-by-Example");

NATS_URL environment variable can be used to pass the locations of the NATS servers.

var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222";

Connect to NATS server. Since connection is disposable at the end of our scope we should flush our buffers and close connection cleanly.

var opts = new NatsOpts
{
    Url = url,
    LoggerFactory = loggerFactory,
    Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);


await nats.ConnectAsync();


using var cts = new CancellationTokenSource();


var s1 = nats.SubscribeAsync<int>("s1", cancellationToken: cts.Token);
var s2 = nats.SubscribeAsync<int>("s2", cancellationToken: cts.Token);
var s3 = nats.SubscribeAsync<int>("s3", cancellationToken: cts.Token);
var s4 = nats.SubscribeAsync<int>("s4", cancellationToken: cts.Token);


const int total = 80;


var subs = Task.Run(async () =>
{
    var count = 0;
    await foreach (var msg in AsyncEnumerableEx.Merge(s1, s2, s3, s4))
    {
        Console.WriteLine($"Received {msg.Subject}: {msg.Data}");
        
        
        if (++count == total)
            await cts.CancelAsync();
    }
});


await Task.Delay(1000);


for (int i = 0; i < total / 4; i++)
{
    await nats.PublishAsync("s1", i);
    await nats.PublishAsync("s2", i);
    await nats.PublishAsync("s3", i);
    await nats.PublishAsync("s4", i);
    await Task.Delay(100);
}


await subs;

That’s it!

logger.LogInformation("Bye!");

Output

info: NATS.Client.Core.NatsConnection[1001]
      Try to connect NATS nats://nats:4222
info: NATS.Client.Core.Internal.NatsReadProtocolProcessor[1005]
      Received server info: ServerInfo { Id = NCL2Z2LULLH7O54YP6SHWYAETV5NDRNW4DJZHVGTERHCZXVO43QRMEDN, Name = NCL2Z2LULLH7O54YP6SHWYAETV5NDRNW4DJZHVGTERHCZXVO43QRMEDN, Version = 2.10.4, ProtocolVersion = 1, GitCommit = abc47f7, GoVersion = go1.21.3, Host = 0.0.0.0, Port = 4222, HeadersSupported = True, AuthRequired = False, TlsRequired = False, TlsVerify = False, TlsAvailable = False, MaxPayload = 1048576, JetStreamAvailable = True, ClientId = 5, ClientIp = 172.23.0.3, Nonce = , Cluster = , ClusterDynamic = False, ClientConnectUrls = , WebSocketConnectUrls = , LameDuckMode = False }
info: NATS.Client.Core.NatsConnection[1001]
      Connect succeed NATS-by-Example, NATS nats://nats:4222
Received s1: 0
Received s2: 0
Received s3: 0
Received s4: 0
Received s1: 1
Received s2: 1
Received s3: 1
Received s4: 1
Received s2: 2
Received s1: 2
Received s3: 2
Received s4: 2
Received s1: 3
Received s2: 3
Received s3: 3
Received s4: 3
Received s2: 4
Received s1: 4
Received s3: 4
Received s4: 4
Received s1: 5
Received s2: 5
Received s3: 5
Received s4: 5
Received s1: 6
Received s2: 6
Received s3: 6
Received s4: 6
Received s1: 7
Received s2: 7
Received s3: 7
Received s4: 7
Received s3: 8
Received s1: 8
Received s2: 8
Received s4: 8
Received s1: 9
Received s2: 9
Received s3: 9
Received s4: 9
Received s1: 10
Received s2: 10
Received s3: 10
Received s4: 10
Received s1: 11
Received s2: 11
Received s3: 11
Received s4: 11
Received s1: 12
Received s2: 12
Received s3: 12
Received s4: 12
Received s1: 13
Received s2: 13
Received s3: 13
Received s4: 13
Received s1: 14
Received s3: 14
Received s4: 14
Received s2: 14
Received s3: 15
Received s2: 15
Received s4: 15
Received s1: 15
Received s1: 16
Received s2: 16
Received s3: 16
Received s4: 16
Received s1: 17
Received s2: 17
Received s3: 17
Received s4: 17
Received s1: 18
Received s2: 18
Received s3: 18
Received s4: 18
Received s1: 19
Received s2: 19
Received s3: 19
Received s4: 19
info: NATS-by-Example[0]
      Bye!
info: NATS.Client.Core.NatsConnection[1001]
      Disposing connection NATS-by-Example

Recording

Note, playback is half speed to make it a bit easier to follow.