Subject-Mapped Partitions in JetStream
A stream provides total order over the set of messages it stores. If configured with replicas, each stream has a dedicated Raft group to ensure every message published to the stream is replicated to the majority of servers having a replica. On successful a publish, every replica of the stream is guaranteed to have the same order of messages. Similiarly, consumers on that stream will always receive the messages in the order they are stored.
This single, serialized stream is highly performant, however for some use cases, there may be a need to scale-out message publishing or consumption. This can be achieved by creating N streams, each of which represent a partition of the total set of messages. The trade-off introduced here is the ability to support concurrent publishes to N streams, while giving up total order across streams.
However, can we ensure deterministic distribution of messages to a given stream using a subject-mapping feature called deterministic subject token partitioning.
This example shows how to configure the subject mapping, streams, and demonstrates the behavior of messages being transparently routed and written to each stream partition.
As a reminder when working with subject mapping, the NATS CLI provides a command to test mappings. This will take the source subject, the destination subject, and one or more subjects to test.
$ nats server mapping "events.*" "events.{{wildcard(1)}}.{{partition(5,1)}}" "events.1"
events.1.4
Code
#!/bin/sh
set -euo pipefail
unset NATS_URL
Define an accounts.conf
file which is included in each of the
individual node configs.
Notice the mappings
option defined on the APP
account
which takes a published message such as events.1
and
will map it to events.1.4
, where the last token indicate
the deterministic partition number. In this case, the second
token 1
is mapped to a partition between 0 and 4 (five total
partitions).
*Note: you can have more than one token when defining the
partitioning. If you require a >
wildcard, then you need at
least one token *
token for the partitioning. For example:
"events.*.>": "events.{{partition(5,1)}}.{{wildcard(1)}}.>"
cat <<- EOF > accounts.conf
accounts: {
SYS: {
users: [{user: sys, password: sys}]
}
APP: {
jetstream: true,
users: [{user: app, password: app}]
mappings: {
"events.*": "events.{{wildcard(1)}}.{{partition(5,1)}}"
}
}
}
system_account: SYS
EOF
Declare the configuration per node including the shared
configuration. Note, if decentralized auth is being used
mappings can be defined via the nsc
tool on account
JWTs.
cat <<- EOF > n1.conf
port: 4222
http_port: 8222
server_name: n1
include accounts.conf
jetstream: {
store_dir: "./n1"
}
cluster: {
name: c1,
port: 6222,
routes: [
"nats-route://0.0.0.0:6222",
"nats-route://0.0.0.0:6223",
"nats-route://0.0.0.0:6224",
],
}
EOF
cat <<- EOF > n2.conf
port: 4223
http_port: 8223
server_name: n2
include accounts.conf
jetstream: {
store_dir: "./n2"
}
cluster: {
name: c1,
port: 6223,
routes: [
"nats-route://0.0.0.0:6222",
"nats-route://0.0.0.0:6223",
"nats-route://0.0.0.0:6224",
],
}
EOF
cat <<- EOF > n3.conf
port: 4224
http_port: 8224
server_name: n3
include accounts.conf
jetstream: {
store_dir: "./n3"
}
cluster: {
name: c1,
port: 6224,
routes: [
"nats-route://0.0.0.0:6222",
"nats-route://0.0.0.0:6223",
"nats-route://0.0.0.0:6224",
],
}
EOF
Start the cluster and wait a few seconds to choose the leader.
nats-server -c n1.conf > /dev/null 2>&1 &
nats-server -c n2.conf > /dev/null 2>&1 &
nats-server -c n3.conf > /dev/null 2>&1 &
sleep 3
Save and select the default context for convenience.
nats context save \
--server "nats://localhost:4222,nats://localhost:4223,nats://localhost:4224" \
--user app \
--password app \
default > /dev/null
nats context select default > /dev/null
Create five streams modeling partitions.
Note the --subjects
option correponds to the subject mapping
we defined above, events.*.0
, events.*.1
, etc.
for i in $(seq 0 4); do
nats stream add \
--retention=limits \
--storage=file \
--replicas=3 \
--discard=old \
--dupe-window=2m \
--max-age=-1 \
--max-msgs=-1 \
--max-bytes=-1 \
--max-msg-size=-1 \
--max-msgs-per-subject=-1 \
--max-consumers=-1 \
--allow-rollup \
--no-deny-delete \
--no-deny-purge \
--subjects="events.*.$i" \
"events-$i" > /dev/null
done
Report the stream to confirm they are present.
nats stream report
Run a benchmark of one publisher using synchronous acks.
The --multisubject
option the sequence number as a token
to the base subject events
in this case. The --stream
option is simply an override to using the default stream
but does not impact behavior.
nats bench \
--js \
--multisubject \
--pub 1 \
--msgs 200000 \
--syncpub \
--no-progress \
--stream "events-0" \
"events"
As expected the throughput is quite low due to the synchronous publish. However, looking at the stream report again, we see the messages have been distributed to each stream.
nats stream report
Let’s purge the streams and try the benchmark again with a async batch of 100.
for i in $(seq 0 4); do
nats stream purge -f "events-$i" > /dev/null
done
nats bench \
--js \
--multisubject \
--pub 1 \
--pubbatch 100 \
--msgs 200000 \
--no-progress \
--stream "events-0" \
"events"
As expected the throughput increases.
nats stream report
Purging one more time, we will also increase the number of concurrent publishers to 5.
for i in $(seq 0 4); do
nats stream purge -f "events-$i" > /dev/null
done
nats bench \
--js \
--multisubject \
--pub 5 \
--pubbatch 100 \
--msgs 200000 \
--no-progress \
--stream "events-0" \
"events"
The throughput increases a bit more. Do note, that throughput and latency will highly depend on the quality of the network, and the resources available on the servers hosing the streams.
Note: If you are looking at the output on NATS by Example and are unimpressed with the numbers, do know this example ran in a GitHub Actions runner which fairly low-resourced having 2 cpu cores and 7GB of memory. Given this examples sets up three servers with five streams all of which are replicated as well as running the CLI for benchmarking (which takes up resources), true performance will suffer in comparison to a production setup.
nats stream report
Output
Obtaining Stream stats ╭──────────────────────────────────────────────────────────────────────────────────────────────╮ │ Stream Report │ ├──────────┬─────────┬───────────┬───────────┬──────────┬───────┬──────┬─────────┬─────────────┤ │ Stream │ Storage │ Placement │ Consumers │ Messages │ Bytes │ Lost │ Deleted │ Replicas │ ├──────────┼─────────┼───────────┼───────────┼──────────┼───────┼──────┼─────────┼─────────────┤ │ events-0 │ File │ │ 0 │ 0 │ 0 B │ 0 │ 0 │ n1*, n2, n3 │ │ events-1 │ File │ │ 0 │ 0 │ 0 B │ 0 │ 0 │ n1, n2*, n3 │ │ events-2 │ File │ │ 0 │ 0 │ 0 B │ 0 │ 0 │ n1, n2*, n3 │ │ events-3 │ File │ │ 0 │ 0 │ 0 B │ 0 │ 0 │ n1*, n2, n3 │ │ events-4 │ File │ │ 0 │ 0 │ 0 B │ 0 │ 0 │ n1*, n2, n3 │ ╰──────────┴─────────┴───────────┴───────────┴──────────┴───────┴──────┴─────────┴─────────────╯ 13:38:28 Starting JetStream benchmark [subject=events.*, multisubject=true, multisubjectmax=0, js=true, msgs=200,000, msgsize=128 B, pubs=1, subs=0, stream=events-0, maxbytes=1.0 GiB, syncpub=true, pubbatch=100, jstimeout=30s, pull=false, consumerbatch=100, push=false, consumername=natscli-bench, purge=false, pubsleep=0s, subsleep=0s, deduplication=%!c(bool=false), dedupwindow=2m0s] 13:38:28 Starting publisher, publishing 200,000 messages Pub stats: 7,152 msgs/sec ~ 894.05 KB/sec Obtaining Stream stats ╭────────────────────────────────────────────────────────────────────────────────────────────────╮ │ Stream Report │ ├──────────┬─────────┬───────────┬───────────┬──────────┬─────────┬──────┬─────────┬─────────────┤ │ Stream │ Storage │ Placement │ Consumers │ Messages │ Bytes │ Lost │ Deleted │ Replicas │ ├──────────┼─────────┼───────────┼───────────┼──────────┼─────────┼──────┼─────────┼─────────────┤ │ events-2 │ File │ │ 0 │ 39,960 │ 6.6 MiB │ 0 │ 0 │ n1, n2*, n3 │ │ events-1 │ File │ │ 0 │ 39,968 │ 6.6 MiB │ 0 │ 0 │ n1, n2*, n3 │ │ events-0 │ File │ │ 0 │ 39,973 │ 6.6 MiB │ 0 │ 0 │ n1*, n2, n3 │ │ events-4 │ File │ │ 0 │ 40,045 │ 6.6 MiB │ 0 │ 0 │ n1*, n2, n3 │ │ events-3 │ File │ │ 0 │ 40,054 │ 6.6 MiB │ 0 │ 0 │ n1*, n2, n3 │ ╰──────────┴─────────┴───────────┴───────────┴──────────┴─────────┴──────┴─────────┴─────────────╯ 13:38:56 Starting JetStream benchmark [subject=events.*, multisubject=true, multisubjectmax=0, js=true, msgs=200,000, msgsize=128 B, pubs=1, subs=0, stream=events-0, maxbytes=1.0 GiB, syncpub=false, pubbatch=100, jstimeout=30s, pull=false, consumerbatch=100, push=false, consumername=natscli-bench, purge=false, pubsleep=0s, subsleep=0s, deduplication=%!c(bool=false), dedupwindow=2m0s] 13:38:56 Starting publisher, publishing 200,000 messages Pub stats: 74,794 msgs/sec ~ 9.13 MB/sec Obtaining Stream stats ╭────────────────────────────────────────────────────────────────────────────────────────────────╮ │ Stream Report │ ├──────────┬─────────┬───────────┬───────────┬──────────┬─────────┬──────┬─────────┬─────────────┤ │ Stream │ Storage │ Placement │ Consumers │ Messages │ Bytes │ Lost │ Deleted │ Replicas │ ├──────────┼─────────┼───────────┼───────────┼──────────┼─────────┼──────┼─────────┼─────────────┤ │ events-2 │ File │ │ 0 │ 39,960 │ 6.6 MiB │ 0 │ 0 │ n1, n2*, n3 │ │ events-1 │ File │ │ 0 │ 39,968 │ 6.6 MiB │ 0 │ 0 │ n1, n2*, n3 │ │ events-0 │ File │ │ 0 │ 39,973 │ 6.6 MiB │ 0 │ 0 │ n1*, n2, n3 │ │ events-4 │ File │ │ 0 │ 40,045 │ 6.6 MiB │ 0 │ 0 │ n1*, n2, n3 │ │ events-3 │ File │ │ 0 │ 40,054 │ 6.6 MiB │ 0 │ 0 │ n1*, n2, n3 │ ╰──────────┴─────────┴───────────┴───────────┴──────────┴─────────┴──────┴─────────┴─────────────╯ 13:38:59 Starting JetStream benchmark [subject=events.*, multisubject=true, multisubjectmax=0, js=true, msgs=200,000, msgsize=128 B, pubs=5, subs=0, stream=events-0, maxbytes=1.0 GiB, syncpub=false, pubbatch=100, jstimeout=30s, pull=false, consumerbatch=100, push=false, consumername=natscli-bench, purge=false, pubsleep=0s, subsleep=0s, deduplication=%!c(bool=false), dedupwindow=2m0s] 13:38:59 Starting publisher, publishing 40,000 messages 13:38:59 Starting publisher, publishing 40,000 messages 13:38:59 Starting publisher, publishing 40,000 messages 13:38:59 Starting publisher, publishing 40,000 messages 13:38:59 Starting publisher, publishing 40,000 messages Pub stats: 92,590 msgs/sec ~ 11.30 MB/sec [1] 22,279 msgs/sec ~ 2.72 MB/sec (40000 msgs) [2] 18,853 msgs/sec ~ 2.30 MB/sec (40000 msgs) [3] 18,660 msgs/sec ~ 2.28 MB/sec (40000 msgs) [4] 18,546 msgs/sec ~ 2.26 MB/sec (40000 msgs) [5] 18,518 msgs/sec ~ 2.26 MB/sec (40000 msgs) min 18,518 | avg 19,371 | max 22,279 | stddev 1,458 msgs Obtaining Stream stats ╭────────────────────────────────────────────────────────────────────────────────────────────────╮ │ Stream Report │ ├──────────┬─────────┬───────────┬───────────┬──────────┬─────────┬──────┬─────────┬─────────────┤ │ Stream │ Storage │ Placement │ Consumers │ Messages │ Bytes │ Lost │ Deleted │ Replicas │ ├──────────┼─────────┼───────────┼───────────┼──────────┼─────────┼──────┼─────────┼─────────────┤ │ events-2 │ File │ │ 0 │ 39,960 │ 6.6 MiB │ 0 │ 0 │ n1, n2*, n3 │ │ events-1 │ File │ │ 0 │ 39,968 │ 6.6 MiB │ 0 │ 0 │ n1, n2*, n3 │ │ events-0 │ File │ │ 0 │ 39,973 │ 6.6 MiB │ 0 │ 0 │ n1*, n2, n3 │ │ events-4 │ File │ │ 0 │ 40,045 │ 6.6 MiB │ 0 │ 0 │ n1*, n2, n3 │ │ events-3 │ File │ │ 0 │ 40,054 │ 6.6 MiB │ 0 │ 0 │ n1*, n2, n3 │ ╰──────────┴─────────┴───────────┴───────────┴──────────┴─────────┴──────┴─────────┴─────────────╯