Object-Store Intro in Object-Store
The object-store (OS) capability in NATS is an abstraction over a stream which models message subjects as keys similar to KV, but with payloads that span multiple chunks. This allows for assets that are larger, and are typically loaded and read as readable/writable streams.
Code
import io
import os
import asyncio
import nats
from nats.js.api import ObjectMeta
from nats.js.errors import BucketNotFoundError
from nats.js.object_store import ObjectStore
servers = os.environ.get('NATS_URL', 'nats://localhost:4222').split(',')
bucket_name = 'configs'
Set up an async function that we’ll use later
async def notify_on_update(watcher: ObjectStore.ObjectWatcher):
while True:
e = await watcher.updates()
if e:
update_type = 'deleted' if e.deleted else 'updated'
print(f'{bucket_name} changed - {e.name} was {update_type}')
async def main():
Connect to NATS server
nc = await nats.connect(servers=servers)
Get a context to produce and consume messages from NATS JetStream
js = nc.jetstream()
Try to access a store called ‘configs’. Create the store if it doesn’t already exist.
try:
object_store = await js.object_store(bucket_name)
except BucketNotFoundError:
object_store = await js.create_object_store(bucket_name)
You can get information on the object store by getting its info:
status = await object_store.status()
print(f'the object store has {status.size} bytes')
10 MiB
data = bytes(10000000)
The Python client has a simple API to put()
Let’s add an entry to the object store
info = await object_store.put('a', data)
print(f'added entry {info.name} ({info.size} bytes)')
Let’s add another one with custom metadata
info = await object_store.put('b', data, meta=ObjectMeta(
description='large data'
))
print(f'added entry {info.name} ({info.size} bytes) "{info.description}"')
Entries in an object store are made from a ‘metadata’ that describes the object And the payload. This allows you to store information about the significance of the entry separate from the raw data. You can update the metadata directly
new_meta = info.meta
new_meta.description = 'still large data'
await object_store.update_meta('b', new_meta)
we expect this store to contain 2 entries You can list its contents
entries = await object_store.list()
print(f'the object store contains {len(entries)} entries')
Now lets retrieve the item
obr = await object_store.get('b')
data = obr.data
print(f'data has {len(data)} bytes')
If you are reading carefully, the payload we stored and read, is 10MiB in size, but the server by default has a max payload of about 1MiB. How can this be possible? - Ah that is what ObjectStore is about. It automatically split into multiple messages and stored in the stream. On read, it read multiple messages and put it together.
print(f'client has a max payload of {nc.max_payload} bytes')
You can watch an object store for changes:
watcher = await object_store.watch(include_history=False)
loop = asyncio.get_event_loop()
loop.create_task(notify_on_update(watcher))
To delete an entry:
await object_store.delete('a')
Because the client may be working with large assets, ObjectStore normally presents a ‘Stream’ based API. You can put data from anything that inherits BufferedIOBase.
buffered_data = io.BytesIO(data)
info = await object_store.put('c', buffered_data, nats.js.api.ObjectMeta(
description='set with a buffer'
))
print(f'added entry {info.name} ({info.size} bytes)- "{info.description}"')
To read the entry:
buffer = io.BytesIO()
result = await object_store.get('c', buffer)
you can read the info on the object:
print(f'{result.info.name} has {result.info.size} bytes')
to get the payload, read it from the buffer
buffer.seek(0)
chunk_size = 128 * 1024 # 128 KB in bytes
while True:
chunk = buffer.read(chunk_size)
if not chunk:
break
print(f'read {len(chunk)} bytes')
You can delete the object store by simply calling js.delete_object_store()
.
Note that after calling delete_object_store, the data is gone forever
await js.delete_object_store(bucket_name)
print('deleted object store')
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Output
the object store has 0 bytes added entry a (10000000 bytes) added entry b (10000000 bytes) "large data" the object store contains 2 entries data has 10000000 bytes client has a max payload of 1048576 bytes configs changed - a was updated configs changed - b was updated configs changed - a was deleted added entry c (10000000 bytes)- "set with a buffer" configs changed - c was updated c has 10000000 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 131072 bytes read 38528 bytes deleted object store