Iterating Over Multiple Subscriptions in Messaging
Core NATS subscription support flexible subject model with tokens and wildcards,
but there are cases that require setting up separate subscriptions
(i.e. user want transport.cars
, transport.planes
and transport.ships
, but not transport.spaceships
).
Such approach works and is performant, but not very convenient when reading the messages. This example shows how to achieve required behaviour without sacrificing usability.
$ nbe run messaging/iterating-multiple-subscriptions/dotnet2View the source code or learn how to run this example yourself
Code
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("NATS-by-Example");
NATS_URL
environment variable can be used to pass the locations of the NATS servers.
var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222";
Connect to NATS server. Since connection is disposable at the end of our scope we should flush our buffers and close connection cleanly.
var opts = new NatsOpts
{
Url = url,
LoggerFactory = loggerFactory,
Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);
await nats.ConnectAsync();
using var cts = new CancellationTokenSource();
var s1 = nats.SubscribeAsync<int>("s1", cancellationToken: cts.Token);
var s2 = nats.SubscribeAsync<int>("s2", cancellationToken: cts.Token);
var s3 = nats.SubscribeAsync<int>("s3", cancellationToken: cts.Token);
var s4 = nats.SubscribeAsync<int>("s4", cancellationToken: cts.Token);
const int total = 80;
var subs = Task.Run(async () =>
{
var count = 0;
await foreach (var msg in AsyncEnumerableEx.Merge(s1, s2, s3, s4))
{
Console.WriteLine($"Received {msg.Subject}: {msg.Data}");
if (++count == total)
await cts.CancelAsync();
}
});
await Task.Delay(1000);
for (int i = 0; i < total / 4; i++)
{
await nats.PublishAsync("s1", i);
await nats.PublishAsync("s2", i);
await nats.PublishAsync("s3", i);
await nats.PublishAsync("s4", i);
await Task.Delay(100);
}
await subs;
That’s it!
logger.LogInformation("Bye!");
Output
info: NATS.Client.Core.NatsConnection[1001] Try to connect NATS nats://nats:4222 info: NATS.Client.Core.Internal.NatsReadProtocolProcessor[1005] Received server info: ServerInfo { Id = NCL2Z2LULLH7O54YP6SHWYAETV5NDRNW4DJZHVGTERHCZXVO43QRMEDN, Name = NCL2Z2LULLH7O54YP6SHWYAETV5NDRNW4DJZHVGTERHCZXVO43QRMEDN, Version = 2.10.4, ProtocolVersion = 1, GitCommit = abc47f7, GoVersion = go1.21.3, Host = 0.0.0.0, Port = 4222, HeadersSupported = True, AuthRequired = False, TlsRequired = False, TlsVerify = False, TlsAvailable = False, MaxPayload = 1048576, JetStreamAvailable = True, ClientId = 5, ClientIp = 172.23.0.3, Nonce = , Cluster = , ClusterDynamic = False, ClientConnectUrls = , WebSocketConnectUrls = , LameDuckMode = False } info: NATS.Client.Core.NatsConnection[1001] Connect succeed NATS-by-Example, NATS nats://nats:4222 Received s1: 0 Received s2: 0 Received s3: 0 Received s4: 0 Received s1: 1 Received s2: 1 Received s3: 1 Received s4: 1 Received s2: 2 Received s1: 2 Received s3: 2 Received s4: 2 Received s1: 3 Received s2: 3 Received s3: 3 Received s4: 3 Received s2: 4 Received s1: 4 Received s3: 4 Received s4: 4 Received s1: 5 Received s2: 5 Received s3: 5 Received s4: 5 Received s1: 6 Received s2: 6 Received s3: 6 Received s4: 6 Received s1: 7 Received s2: 7 Received s3: 7 Received s4: 7 Received s3: 8 Received s1: 8 Received s2: 8 Received s4: 8 Received s1: 9 Received s2: 9 Received s3: 9 Received s4: 9 Received s1: 10 Received s2: 10 Received s3: 10 Received s4: 10 Received s1: 11 Received s2: 11 Received s3: 11 Received s4: 11 Received s1: 12 Received s2: 12 Received s3: 12 Received s4: 12 Received s1: 13 Received s2: 13 Received s3: 13 Received s4: 13 Received s1: 14 Received s3: 14 Received s4: 14 Received s2: 14 Received s3: 15 Received s2: 15 Received s4: 15 Received s1: 15 Received s1: 16 Received s2: 16 Received s3: 16 Received s4: 16 Received s1: 17 Received s2: 17 Received s3: 17 Received s4: 17 Received s1: 18 Received s2: 18 Received s3: 18 Received s4: 18 Received s1: 19 Received s2: 19 Received s3: 19 Received s4: 19 info: NATS-by-Example[0] Bye! info: NATS.Client.Core.NatsConnection[1001] Disposing connection NATS-by-Example
Recording
Note, playback is half speed to make it a bit easier to follow.
Install NuGet packages
NATS.Net
,System.Interactive.Async
andMicrosoft.Extensions.Logging.Console
.