JSON for Message Payloads in Messaging
The basic structure of a NATS message modeled in the client libraries includes the subject the message was published to, the application-defined payload, and an optional set of headers (for requests, there is also a reply-to subject). The payload is a sequence of bytes, so it is up to the application to define how to serialize and deserialize the payload.
JSON is ubiquitous and simple data-interchange format that is supported in virtually all programming languages. This example demonstrates how to serialize and deserialize a message payload using a JSON library.
Code
using System;
using System.Text;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("NATS-by-Example");
NATS_URL
environment variable can be used to pass the locations of the NATS servers.
var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222";
Connect to NATS server. Since connection is disposable at the end of our scope we should flush our buffers and close connection cleanly.
var opts = new NatsOpts
{
Url = url,
LoggerFactory = loggerFactory,
Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);
await nats.ConnectAsync();
When subscribing or publishing you can use the generated JSON serializer to deserialize the JSON payload. We can also demonstrate how to use the raw JSON payload and how to use a binary data. For more information about the serializers see see our documentation.
var mySerializer = new NatsJsonContextSerializer<MyData>(MyJsonContext.Default);
var subIterator1 = await nats.SubscribeCoreAsync<MyData>("data", serializer: mySerializer);
var subTask1 = Task.Run(async () =>
{
logger.LogInformation("Waiting for messages...");
await foreach (var msg in subIterator1.Msgs.ReadAllAsync())
{
if (msg.Data is null)
{
logger.LogInformation("Received empty payload: End of messages");
break;
}
var data = msg.Data;
logger.LogInformation("Received deserialized object {Data}", data);
}
});
var subIterator2 = await nats.SubscribeCoreAsync<NatsMemoryOwner<byte>>("data");
var subTask2 = Task.Run(async () =>
{
logger.LogInformation("Waiting for messages...");
await foreach (var msg in subIterator2.Msgs.ReadAllAsync())
{
using var memoryOwner = msg.Data;
if (memoryOwner.Length == 0)
{
logger.LogInformation("Received empty payload: End of messages");
break;
}
var json = Encoding.UTF8.GetString(memoryOwner.Span);
logger.LogInformation("Received raw JSON {Json}", json);
}
});
await nats.PublishAsync<MyData>(subject: "data", data: new MyData{ Id = 1, Name = "Bob" }, serializer: mySerializer);
await nats.PublishAsync<byte[]>(subject: "data", data: Encoding.UTF8.GetBytes("""{"id":2,"name":"Joe"}"""));
var alice = """{"id":3,"name":"Alice"}""";
var bw = new NatsBufferWriter<byte>();
var byteCount = Encoding.UTF8.GetByteCount(alice);
var memory = bw.GetMemory(byteCount);
Encoding.UTF8.GetBytes(alice, memory.Span);
bw.Advance(byteCount);
await nats.PublishAsync<NatsBufferWriter<byte>>(subject: "data", data: bw);
await nats.PublishAsync(subject: "data");
await Task.WhenAll(subTask1, subTask2);
That’s it!
logger.LogInformation("Bye!");
Serializer generator
[JsonSerializable(typeof(MyData))]
internal partial class MyJsonContext : JsonSerializerContext;
public record MyData
{
[JsonPropertyName("id")]
public int Id { get; set; }
[JsonPropertyName("name")]
public string? Name { get; set; }
}
Output
info: NATS.Client.Core.NatsConnection[1001] Try to connect NATS nats://nats:4222 info: NATS.Client.Core.Internal.NatsReadProtocolProcessor[1005] Received server info: ServerInfo { Id = NCGBX2DWLQ6UCLYUSRL7CE4C5U4OIFNM3XPHG3IEVLHT7BN6AFOPAEBP, Name = NCGBX2DWLQ6UCLYUSRL7CE4C5U4OIFNM3XPHG3IEVLHT7BN6AFOPAEBP, Version = 2.10.4, ProtocolVersion = 1, GitCommit = abc47f7, GoVersion = go1.21.3, Host = 0.0.0.0, Port = 4222, HeadersSupported = True, AuthRequired = False, TlsRequired = False, TlsVerify = False, TlsAvailable = False, MaxPayload = 1048576, JetStreamAvailable = True, ClientId = 5, ClientIp = 172.20.0.3, Nonce = , Cluster = , ClusterDynamic = False, ClientConnectUrls = , WebSocketConnectUrls = , LameDuckMode = False } info: NATS.Client.Core.NatsConnection[1001] Connect succeed NATS-by-Example, NATS nats://nats:4222 info: NATS-by-Example[0] Waiting for messages... info: NATS-by-Example[0] Waiting for messages... info: NATS-by-Example[0] Received deserialized object MyData { Id = 1, Name = Bob } info: NATS-by-Example[0] Received deserialized object MyData { Id = 2, Name = Joe } info: NATS-by-Example[0] Received deserialized object MyData { Id = 3, Name = Alice } info: NATS-by-Example[0] Received empty payload: End of messages info: NATS-by-Example[0] Received raw JSON {"id":1,"name":"Bob"} info: NATS-by-Example[0] Received raw JSON {"id":2,"name":"Joe"} info: NATS-by-Example[0] Received raw JSON {"id":3,"name":"Alice"} info: NATS-by-Example[0] Received empty payload: End of messages info: NATS-by-Example[0] Bye! info: NATS.Client.Core.NatsConnection[1001] Disposing connection NATS-by-Example
Install NuGet packages
NATS.Net
andMicrosoft.Extensions.Logging.Console
.