Object-Store Intro in Object-Store
The object-store (OS) capability in NATS is an abstraction over a stream which models message subjects as keys similar to KV, but with payloads that span multiple chunks. This allows for assets that are larger, and are typically loaded and read as readable/writable streams.
Code
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.ObjectStore;
using NATS.Client.ObjectStore.Models;
using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("NATS-by-Example");
NATS_URL
environment variable can be used to pass the locations of the NATS servers.
var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222";
Connect to NATS server. Since connection is disposable at the end of our scope we should flush our buffers and close connection cleanly.
var opts = new NatsOpts
{
Url = url,
LoggerFactory = loggerFactory,
Name = "NATS-by-Example",
};
await using var nats = new NatsConnection(opts);
var js = new NatsJSContext(nats);
var obj = new NatsObjContext(js);
Object store basics
An object-store (OS) bucket is created by specifying a bucket name. Here we try to access a store called “configs”, if it doesn’t exist the API will create it:
var store = await obj.CreateObjectStore("configs");
You can get information on the object store by getting its info:
var status = await store.GetStatusAsync();
logger.LogInformation("The object store has {Size} bytes", status.Info.State.Bytes);
10MiB
const int bytes = 10_000_000;
var data = new byte[bytes];
Let’s add an entry to the object store
var info = await store.PutAsync(key: "a", data);
logger.LogInformation("Added entry {Name} ({Size} bytes)- '{Description}'", info.Name, info.Size, info.Description);
Entries in an object store are made from a “metadata” that describes the object And the payload. This allows you to store information about the significance of the entry separate from the raw data. You can update the metadata directly
await store.UpdateMetaAsync("a", new ObjectMetadata { Name = "a", Description = "still large data" });
we expect this store to only contain one entry You can list its contents:
var count = 0;
await foreach (var entry in store.ListAsync())
{
logger.LogInformation("Entry {Name} ({Size} bytes)- '{Description}'", info.Name, info.Size, info.Description);
count++;
}
logger.LogInformation("The object store contains {Count} entries", count);
Now lets retrieve the item we added
var data1 = await store.GetBytesAsync("a");
logger.LogInformation("Data has {Size} bytes", data1.Length);
You can watch an object store for changes:
var watcher = Task.Run(async () =>
{
await foreach (var m in store.WatchAsync(new NatsObjWatchOpts{IncludeHistory = false}))
{
logger.LogInformation(">>>>>>>> Watch: {Bucket} changed '{Name}' {Op}", m.Bucket, m.Name, m.Deleted ? "was deleted" : "was updated");
}
});
To delete an entry:
await store.DeleteAsync("a");
Because the client may be working with large assets, ObjectStore normally presents a “Stream” based API.
info = await store.PutAsync(new ObjectMetadata { Name = "b", Description = "set with a stream" }, new MemoryStream(data));
logger.LogInformation("Added entry {Name} ({Size} bytes)- '{Description}'", info.Name, info.Size, info.Description);
var ms = new MemoryStream();
info = await store.GetAsync("b", ms);
logger.LogInformation("Got entry {Name} ({Size} bytes)- '{Description}'", info.Name, info.Size, info.Description);
await obj.DeleteObjectStore("configs", CancellationToken.None);
That’s it!
logger.LogInformation("Bye!");
Output
info: NATS.Client.Core.NatsConnection[1001] Try to connect NATS nats://nats:4222 info: NATS.Client.Core.Internal.NatsReadProtocolProcessor[1005] Received server info: ServerInfo { Id = NAQ5RG6LC5UEKIQUEUJF7ZEODY33CYLHKAJEAP2GNYWLBVSJDCAQOACP, Name = NAQ5RG6LC5UEKIQUEUJF7ZEODY33CYLHKAJEAP2GNYWLBVSJDCAQOACP, Version = 2.10.4, ProtocolVersion = 1, GitCommit = abc47f7, GoVersion = go1.21.3, Host = 0.0.0.0, Port = 4222, HeadersSupported = True, AuthRequired = False, TlsRequired = False, TlsVerify = False, TlsAvailable = False, MaxPayload = 1048576, JetStreamAvailable = True, ClientId = 5, ClientIp = 192.168.224.3, Nonce = , Cluster = , ClusterDynamic = False, ClientConnectUrls = , WebSocketConnectUrls = , LameDuckMode = False } info: NATS.Client.Core.NatsConnection[1001] Connect succeed NATS-by-Example, NATS nats://nats:4222 info: NATS-by-Example[0] The object store has 0 bytes info: NATS-by-Example[0] Added entry a (10000000 bytes)- '(null)' info: NATS-by-Example[0] Entry a (10000000 bytes)- '(null)' info: NATS-by-Example[0] The object store contains 1 entries info: NATS-by-Example[0] Data has 10000000 bytes info: NATS-by-Example[0] >>>>>>>> Watch: configs changed 'a' was updated info: NATS-by-Example[0] Added entry b (10000000 bytes)- 'set with a stream' info: NATS-by-Example[0] Got entry b (10000000 bytes)- 'set with a stream' info: NATS-by-Example[0] Bye! info: NATS.Client.Core.NatsConnection[1001] Disposing connection NATS-by-Example
Install NuGet packages
NATS.Net
andMicrosoft.Extensions.Logging.Console
.