NATS Logo by Example

Subject-Mapped Partitions in JetStream

A stream provides total order over the set of messages it stores. If configured with replicas, each stream has a dedicated Raft group to ensure every message published to the stream is replicated to the majority of servers having a replica. On successful a publish, every replica of the stream is guaranteed to have the same order of messages. Similiarly, consumers on that stream will always receive the messages in the order they are stored.

This single, serialized stream is highly performant, however for some use cases, there may be a need to scale-out message publishing or consumption. This can be achieved by creating N streams, each of which represent a partition of the total set of messages. The trade-off introduced here is the ability to support concurrent publishes to N streams, while giving up total order across streams.

However, can we ensure deterministic distribution of messages to a given stream using a subject-mapping feature called deterministic subject token partitioning.

This example shows how to configure the subject mapping, streams, and demonstrates the behavior of messages being transparently routed and written to each stream partition.

As a reminder when working with subject mapping, the NATS CLI provides a command to test mappings. This will take the source subject, the destination subject, and one or more subjects to test.

$ nats server mapping "events.*" "events.{{wildcard(1)}}.{{partition(5,1)}}" "events.1"
events.1.4
CLI Go Python JavaScript Rust C# .NET V2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/partitions/cli
View the source code or learn how to run this example yourself

Code

#!/bin/sh


set -euo pipefail


unset NATS_URL

Define an accounts.conf file which is included in each of the individual node configs.

Notice the mappings option defined on the APP account which takes a published message such as events.1 and will map it to events.1.4, where the last token indicate the deterministic partition number. In this case, the second token 1 is mapped to a partition between 0 and 4 (five total partitions).

*Note: you can have more than one token when defining the partitioning. If you require a > wildcard, then you need at least one token * token for the partitioning. For example: "events.*.>": "events.{{partition(5,1)}}.{{wildcard(1)}}.>"

cat <<- EOF > accounts.conf
accounts: {
  SYS: {
    users: [{user: sys, password: sys}]
  }


  APP: {
    jetstream: true,
    users: [{user: app, password: app}]
    mappings: {
      "events.*": "events.{{wildcard(1)}}.{{partition(5,1)}}"
    }
  }
}


system_account: SYS
EOF

Declare the configuration per node including the shared configuration. Note, if decentralized auth is being used mappings can be defined via the nsc tool on account JWTs.

cat <<- EOF > n1.conf
port: 4222
http_port: 8222
server_name: n1


include accounts.conf


jetstream: {
  store_dir: "./n1"
}


cluster: {
  name: c1,
  port: 6222,
  routes: [
    "nats-route://0.0.0.0:6222",
    "nats-route://0.0.0.0:6223",
    "nats-route://0.0.0.0:6224",
  ],
}
EOF


cat <<- EOF > n2.conf
port: 4223
http_port: 8223
server_name: n2


include accounts.conf


jetstream: {
  store_dir: "./n2"
}


cluster: {
  name: c1,
  port: 6223,
  routes: [
    "nats-route://0.0.0.0:6222",
    "nats-route://0.0.0.0:6223",
    "nats-route://0.0.0.0:6224",
  ],
}
EOF


cat <<- EOF > n3.conf
port: 4224
http_port: 8224
server_name: n3


include accounts.conf


jetstream: {
  store_dir: "./n3"
}


cluster: {
  name: c1,
  port: 6224,
  routes: [
    "nats-route://0.0.0.0:6222",
    "nats-route://0.0.0.0:6223",
    "nats-route://0.0.0.0:6224",
  ],
}
EOF



Start the cluster and wait a few seconds to choose the leader.

nats-server -c n1.conf > /dev/null 2>&1 &
nats-server -c n2.conf > /dev/null 2>&1 &
nats-server -c n3.conf > /dev/null 2>&1 &


sleep 3

Save and select the default context for convenience.

nats context save \
  --server "nats://localhost:4222,nats://localhost:4223,nats://localhost:4224" \
  --user app \
  --password app \
  default > /dev/null


nats context select default > /dev/null

Create five streams modeling partitions. Note the --subjects option correponds to the subject mapping we defined above, events.*.0, events.*.1, etc.

for i in $(seq 0 4); do
  nats stream add \
    --retention=limits \
    --storage=file \
    --replicas=3 \
    --discard=old \
    --dupe-window=2m \
    --max-age=-1 \
    --max-msgs=-1 \
    --max-bytes=-1 \
    --max-msg-size=-1 \
    --max-msgs-per-subject=-1 \
    --max-consumers=-1 \
    --allow-rollup \
    --no-deny-delete \
    --no-deny-purge \
    --subjects="events.*.$i" \
    "events-$i" > /dev/null
done

Report the stream to confirm they are present.

nats stream report

Run a benchmark of one publisher using synchronous acks. The --multisubject option the sequence number as a token to the base subject events in this case. The --stream option is simply an override to using the default stream but does not impact behavior.

nats bench \
  --js \
  --multisubject \
  --pub 1 \
  --msgs 200000 \
  --syncpub \
  --no-progress \
  --stream "events-0" \
  "events"

As expected the throughput is quite low due to the synchronous publish. However, looking at the stream report again, we see the messages have been distributed to each stream.

nats stream report

Let’s purge the streams and try the benchmark again with a async batch of 100.

for i in $(seq 0 4); do
  nats stream purge -f "events-$i" > /dev/null
done


nats bench \
  --js \
  --multisubject \
  --pub 1 \
  --pubbatch 100 \
  --msgs 200000 \
  --no-progress \
  --stream "events-0" \
  "events"

As expected the throughput increases.

nats stream report

Purging one more time, we will also increase the number of concurrent publishers to 5.

for i in $(seq 0 4); do
  nats stream purge -f "events-$i" > /dev/null
done


nats bench \
  --js \
  --multisubject \
  --pub 5 \
  --pubbatch 100 \
  --msgs 200000 \
  --no-progress \
  --stream "events-0" \
  "events"

The throughput increases a bit more. Do note, that throughput and latency will highly depend on the quality of the network, and the resources available on the servers hosing the streams.

Note: If you are looking at the output on NATS by Example and are unimpressed with the numbers, do know this example ran in a GitHub Actions runner which fairly low-resourced having 2 cpu cores and 7GB of memory. Given this examples sets up three servers with five streams all of which are replicated as well as running the CLI for benchmarking (which takes up resources), true performance will suffer in comparison to a production setup.

nats stream report

Output

Obtaining Stream stats

╭──────────────────────────────────────────────────────────────────────────────────────────────╮
│                                        Stream Report                                         │
├──────────┬─────────┬───────────┬───────────┬──────────┬───────┬──────┬─────────┬─────────────┤
│ Stream   │ Storage │ Placement │ Consumers │ Messages │ Bytes │ Lost │ Deleted │ Replicas    │
├──────────┼─────────┼───────────┼───────────┼──────────┼───────┼──────┼─────────┼─────────────┤
│ events-0 │ File    │           │ 0         │ 0        │ 0 B   │ 0    │ 0       │ n1*, n2, n3 │
│ events-1 │ File    │           │ 0         │ 0        │ 0 B   │ 0    │ 0       │ n1, n2*, n3 │
│ events-2 │ File    │           │ 0         │ 0        │ 0 B   │ 0    │ 0       │ n1, n2*, n3 │
│ events-3 │ File    │           │ 0         │ 0        │ 0 B   │ 0    │ 0       │ n1*, n2, n3 │
│ events-4 │ File    │           │ 0         │ 0        │ 0 B   │ 0    │ 0       │ n1*, n2, n3 │
╰──────────┴─────────┴───────────┴───────────┴──────────┴───────┴──────┴─────────┴─────────────╯

13:38:28 Starting JetStream benchmark [subject=events.*,  multisubject=true, multisubjectmax=0, js=true, msgs=200,000, msgsize=128 B, pubs=1, subs=0, stream=events-0, maxbytes=1.0 GiB, syncpub=true, pubbatch=100, jstimeout=30s, pull=false, consumerbatch=100, push=false, consumername=natscli-bench, purge=false, pubsleep=0s, subsleep=0s, deduplication=%!c(bool=false), dedupwindow=2m0s]
13:38:28 Starting publisher, publishing 200,000 messages

Pub stats: 7,152 msgs/sec ~ 894.05 KB/sec

Obtaining Stream stats

╭────────────────────────────────────────────────────────────────────────────────────────────────╮
│                                         Stream Report                                          │
├──────────┬─────────┬───────────┬───────────┬──────────┬─────────┬──────┬─────────┬─────────────┤
│ Stream   │ Storage │ Placement │ Consumers │ Messages │ Bytes   │ Lost │ Deleted │ Replicas    │
├──────────┼─────────┼───────────┼───────────┼──────────┼─────────┼──────┼─────────┼─────────────┤
│ events-2 │ File    │           │ 0         │ 39,960   │ 6.6 MiB │ 0    │ 0       │ n1, n2*, n3 │
│ events-1 │ File    │           │ 0         │ 39,968   │ 6.6 MiB │ 0    │ 0       │ n1, n2*, n3 │
│ events-0 │ File    │           │ 0         │ 39,973   │ 6.6 MiB │ 0    │ 0       │ n1*, n2, n3 │
│ events-4 │ File    │           │ 0         │ 40,045   │ 6.6 MiB │ 0    │ 0       │ n1*, n2, n3 │
│ events-3 │ File    │           │ 0         │ 40,054   │ 6.6 MiB │ 0    │ 0       │ n1*, n2, n3 │
╰──────────┴─────────┴───────────┴───────────┴──────────┴─────────┴──────┴─────────┴─────────────╯

13:38:56 Starting JetStream benchmark [subject=events.*,  multisubject=true, multisubjectmax=0, js=true, msgs=200,000, msgsize=128 B, pubs=1, subs=0, stream=events-0, maxbytes=1.0 GiB, syncpub=false, pubbatch=100, jstimeout=30s, pull=false, consumerbatch=100, push=false, consumername=natscli-bench, purge=false, pubsleep=0s, subsleep=0s, deduplication=%!c(bool=false), dedupwindow=2m0s]
13:38:56 Starting publisher, publishing 200,000 messages

Pub stats: 74,794 msgs/sec ~ 9.13 MB/sec

Obtaining Stream stats

╭────────────────────────────────────────────────────────────────────────────────────────────────╮
│                                         Stream Report                                          │
├──────────┬─────────┬───────────┬───────────┬──────────┬─────────┬──────┬─────────┬─────────────┤
│ Stream   │ Storage │ Placement │ Consumers │ Messages │ Bytes   │ Lost │ Deleted │ Replicas    │
├──────────┼─────────┼───────────┼───────────┼──────────┼─────────┼──────┼─────────┼─────────────┤
│ events-2 │ File    │           │ 0         │ 39,960   │ 6.6 MiB │ 0    │ 0       │ n1, n2*, n3 │
│ events-1 │ File    │           │ 0         │ 39,968   │ 6.6 MiB │ 0    │ 0       │ n1, n2*, n3 │
│ events-0 │ File    │           │ 0         │ 39,973   │ 6.6 MiB │ 0    │ 0       │ n1*, n2, n3 │
│ events-4 │ File    │           │ 0         │ 40,045   │ 6.6 MiB │ 0    │ 0       │ n1*, n2, n3 │
│ events-3 │ File    │           │ 0         │ 40,054   │ 6.6 MiB │ 0    │ 0       │ n1*, n2, n3 │
╰──────────┴─────────┴───────────┴───────────┴──────────┴─────────┴──────┴─────────┴─────────────╯

13:38:59 Starting JetStream benchmark [subject=events.*,  multisubject=true, multisubjectmax=0, js=true, msgs=200,000, msgsize=128 B, pubs=5, subs=0, stream=events-0, maxbytes=1.0 GiB, syncpub=false, pubbatch=100, jstimeout=30s, pull=false, consumerbatch=100, push=false, consumername=natscli-bench, purge=false, pubsleep=0s, subsleep=0s, deduplication=%!c(bool=false), dedupwindow=2m0s]
13:38:59 Starting publisher, publishing 40,000 messages
13:38:59 Starting publisher, publishing 40,000 messages
13:38:59 Starting publisher, publishing 40,000 messages
13:38:59 Starting publisher, publishing 40,000 messages
13:38:59 Starting publisher, publishing 40,000 messages

Pub stats: 92,590 msgs/sec ~ 11.30 MB/sec
 [1] 22,279 msgs/sec ~ 2.72 MB/sec (40000 msgs)
 [2] 18,853 msgs/sec ~ 2.30 MB/sec (40000 msgs)
 [3] 18,660 msgs/sec ~ 2.28 MB/sec (40000 msgs)
 [4] 18,546 msgs/sec ~ 2.26 MB/sec (40000 msgs)
 [5] 18,518 msgs/sec ~ 2.26 MB/sec (40000 msgs)
 min 18,518 | avg 19,371 | max 22,279 | stddev 1,458 msgs

Obtaining Stream stats

╭────────────────────────────────────────────────────────────────────────────────────────────────╮
│                                         Stream Report                                          │
├──────────┬─────────┬───────────┬───────────┬──────────┬─────────┬──────┬─────────┬─────────────┤
│ Stream   │ Storage │ Placement │ Consumers │ Messages │ Bytes   │ Lost │ Deleted │ Replicas    │
├──────────┼─────────┼───────────┼───────────┼──────────┼─────────┼──────┼─────────┼─────────────┤
│ events-2 │ File    │           │ 0         │ 39,960   │ 6.6 MiB │ 0    │ 0       │ n1, n2*, n3 │
│ events-1 │ File    │           │ 0         │ 39,968   │ 6.6 MiB │ 0    │ 0       │ n1, n2*, n3 │
│ events-0 │ File    │           │ 0         │ 39,973   │ 6.6 MiB │ 0    │ 0       │ n1*, n2, n3 │
│ events-4 │ File    │           │ 0         │ 40,045   │ 6.6 MiB │ 0    │ 0       │ n1*, n2, n3 │
│ events-3 │ File    │           │ 0         │ 40,054   │ 6.6 MiB │ 0    │ 0       │ n1*, n2, n3 │
╰──────────┴─────────┴───────────┴───────────┴──────────┴─────────┴──────┴─────────┴─────────────╯

Recording

Note, playback is half speed to make it a bit easier to follow.