Core Publish-Subscribe in Messaging
This example demonstrates the core NATS publish-subscribe behavior. This is the fundamental pattern that all other NATS patterns and higher-level APIs build upon. There are a few takeaways from this example:
- Delivery is an at-most-once. For MQTT users, this is referred to as Quality of Service (QoS) 0.
- There are two circumstances when a published message won’t be delivered to a subscriber:
- The subscriber does not have an active connection to the server (i.e. the client is temporarily offline for some reason)
- There is a network interruption where the message is ultimately dropped
- Messages are published to subjects which can be one or more concrete tokens, e.g.
greet.bob. Subscribers can utilize wildcards to show interest on a set of matching subjects.
Code
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.Serializers.Json;
using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("NATS-by-Example");
NATS_URL environment variable can be used to pass the locations of the NATS servers.
var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222";
Connect to NATS server. Since connection is disposable at the end of our scope we should flush our buffers and close connection cleanly.
var opts = new NatsOpts
{
Url = url,
LoggerFactory = loggerFactory,
SerializerRegistry = NatsJsonSerializerRegistry.Default,
Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);
Subscribe to a subject and start waiting for messages in the background.
await using var sub = await nats.SubscribeCoreAsync<Order>("orders.>");
logger.LogInformation("Waiting for messages...");
var task = Task.Run(async () =>
{
await foreach (var msg in sub.Msgs.ReadAllAsync())
{
var order = msg.Data;
logger.LogInformation("Subscriber received {Subject}: {Order}", msg.Subject, order);
}
logger.LogInformation("Unsubscribed");
});
Let’s publish a few orders.
for (int i = 0; i < 5; i++)
{
logger.LogInformation("Publishing order {Index}...", i);
await nats.PublishAsync($"orders.new.{i}", new Order(OrderId: i));
await Task.Delay(500);
}
We can unsubscribe now all orders are published. Unsubscribing or disposing the subscription should complete the message loop and exit the background task cleanly.
await sub.UnsubscribeAsync();
await task;
That’s it! We saw how we can subscribe to a subject and publish messages that would be seen by the subscribers based on matching subjects.
logger.LogInformation("Bye!");
public record Order(int OrderId);
Output
info: NATS.Client.Core.NatsConnection[1001]
Try to connect NATS nats://nats:4222
info: NATS.Client.Core.Internal.NatsReadProtocolProcessor[1005]
Received server info: ServerInfo { Id = NA5KDEZVXJXTIYMQAXRROBZL3WMLCO63YO2Z5QJOABXIFQZDN3AKTBF2, Name = NA5KDEZVXJXTIYMQAXRROBZL3WMLCO63YO2Z5QJOABXIFQZDN3AKTBF2, Version = 2.10.4, ProtocolVersion = 1, GitCommit = abc47f7, GoVersion = go1.21.3, Host = 0.0.0.0, Port = 4222, HeadersSupported = True, AuthRequired = False, TlsRequired = False, TlsVerify = False, TlsAvailable = False, MaxPayload = 1048576, JetStreamAvailable = True, ClientId = 5, ClientIp = 172.18.0.3, Nonce = , Cluster = , ClusterDynamic = False, ClientConnectUrls = , WebSocketConnectUrls = , LameDuckMode = False }
info: NATS.Client.Core.NatsConnection[1001]
Connect succeed NATS-by-Example, NATS nats://nats:4222
info: NATS-by-Example[0]
Waiting for messages...
info: NATS-by-Example[0]
Publishing order 0...
info: NATS-by-Example[0]
Subscriber received orders.new.0: Order { OrderId = 0 }
info: NATS-by-Example[0]
Publishing order 1...
info: NATS-by-Example[0]
Subscriber received orders.new.1: Order { OrderId = 1 }
info: NATS-by-Example[0]
Publishing order 2...
info: NATS-by-Example[0]
Subscriber received orders.new.2: Order { OrderId = 2 }
info: NATS-by-Example[0]
Publishing order 3...
info: NATS-by-Example[0]
Subscriber received orders.new.3: Order { OrderId = 3 }
info: NATS-by-Example[0]
Publishing order 4...
info: NATS-by-Example[0]
Subscriber received orders.new.4: Order { OrderId = 4 }
info: NATS-by-Example[0]
Unsubscribed
info: NATS-by-Example[0]
Bye!
info: NATS.Client.Core.NatsConnection[1001]
Disposing connection NATS-by-Example
Install NuGet packages
NATS.Net,NATS.Client.Serializers.JsonandMicrosoft.Extensions.Logging.Console.