NATS Logo by Example

Work-queue Stream in JetStream

A work-queue retention policy satisfies a very common use case of queuing up messages that are intended to be processed once and only once.

This retention policy supports queuing up messages from publishers independent of consummption. Since each message is intended to be processed only once, this retention type allows for a set of consumers that have non-overlapping interest on subjects. In other words, if multiple consumers are bound to a work-queue stream, they must have disjoint filter subjects. This is in constrast to a standard limits-based or interest-based stream which supports multiple consumers with overlapping interest.

Like the interest policy this retention policy is additive to any limits set on the stream. As a contrived example, if max-msgs is set to one with old messages being discarded, every new message that is received by the stream will result in the prior message being deleted regardless if any subscriptions were available to process the message.

In this example, we will walk through the work-queue retention setup and behavior. If you are new to streams, it is recommended to read the limits-based stream example prior to reading this one.

CLI Go Python JavaScript Rust C# .NET V2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/workqueue-stream/crystal
View the source code or learn how to run this example yourself

Code

require "nats"
require "nats/jetstream"

Get the NATS_URL from the environment or fallback to the default. This can be a comma-separated string. We convert it to an Array(URI) to pass to the NATS client.

servers = ENV.fetch("NATS_URL", "nats://localhost:4222")
  .split(',')
  .map { |url| URI.parse(url) }

Create a client connection to an available NATS server.

nats = NATS::Client.new(servers)

When the program exits, we close the NATS client which waits for any pending messages (published or in a subscription) to be flushed.

at_exit { nats.close }


js = nats.jetstream

Create the stream

Define the stream configuration, specifying :workqueue for the retention policy.

stream = js.stream.create(
  name: "EVENTS",
  retention: :workqueue,
  subjects: %w[events.>],
  storage: :file,
)
puts "Created the stream"

Queue messages

Publish a few messages.

js.publish "events.us.page_loaded", ""
js.publish "events.eu.mouse_clicked", ""
js.publish "events.us.input_focused", ""
puts "Published 3 messages"

Checking the stream info, we see three messages have been queued.

puts
puts "Stream info without any consumers:"
pp js.stream.info(stream.config.name).try(&.state)

Adding a consumer

Now let’s add a consumer and publish a few more messages. It can be a push or pull consumer. For this example, we are defining a pull consumer.

consumer1 = js.consumer.create(
  stream_name: stream.config.name,
  durable_name: "processor-1",
)
sub1 = js.pull_subscribe(consumer1)

Fetch and ack the queued messages

msgs = sub1.fetch(3)
msgs.each { |msg| js.ack_sync msg }

Checking the stream info again, we will notice no messages are available.

puts "Stream info with 1 consumer:"
pp js.stream.info(stream.config.name).try(&.state)

Exclusive non-filtered consumer

As noted in the description above, work-queue streams can only have at most one consumer with interest on a subject at any given time. Since the pull consumer above is not filtered, if we try to create another one, it will fail.

begin
  puts
  puts "Creating an overlapping consumer"
  consumer2 = js.consumer.create(
    stream_name: stream.config.name,
    durable_name: "processor-2",
  )
rescue ex
  puts "** #{ex}"
end

However, if we delete the first one, we can add the new one.

puts
puts "Deleting first consumer"
js.consumer.delete consumer1


puts "Creating second consumer:"
pp consumer2 = js.consumer.create(
  stream_name: stream.config.name,
  durable_name: "processor-2",
)
js.consumer.delete consumer2

Multiple filtered consumers

To create multiple consumers, a subject filter needs to be applied. For this example, we could scope each consumer to the geo that the event was published from, in this case us or eu.

puts
puts "Creating non-overlapping consumers"
consumer1 = js.consumer.create(
  stream_name: stream.config.name,
  durable_name: "processor-us",
  filter_subject: "events.us.>",
)
consumer2 = js.consumer.create(
  stream_name: stream.config.name,
  durable_name: "processor-eu",
  filter_subject: "events.eu.>",
)
sub1 = js.pull_subscribe(consumer1)
sub2 = js.pull_subscribe(consumer2)


js.publish("events.eu.mouse_clicked", "")
js.publish("events.us.page_loaded", "")
js.publish("events.us.input_focused", "")
js.publish("events.eu.page_loaded", "")
puts "Published 4 messages"


sub1.fetch(2).each do |msg|
  puts "US subscription got: #{msg.subject}"
  js.ack msg
end
sub2.fetch(2).each do |msg|
  puts "EU subscription got: #{msg.subject}"
  js.ack msg
end

Output

Created the stream
Published 3 messages

Stream info without any consumers:
NATS::JetStream::API::V1::StreamState(
 @bytes=205,
 @consumer_count=0,
 @first_seq=1,
 @first_ts=2023-03-16 13:37:24.550721827 UTC,
 @last_seq=3,
 @last_ts=2023-03-16 13:37:24.551020697 UTC,
 @messages=3)
Stream info with 1 consumer:
NATS::JetStream::API::V1::StreamState(
 @bytes=0,
 @consumer_count=1,
 @first_seq=4,
 @first_ts=1970-01-01 00:00:00.0 UTC,
 @last_seq=3,
 @last_ts=2023-03-16 13:37:24.551020697 UTC,
 @messages=0)

Creating an overlapping consumer
** multiple non-filtered consumers not allowed on workqueue stream (400)

Deleting first consumer
Creating second consumer:
NATS::JetStream::API::V1::Consumer(
 @ack_floor=
  NATS::JetStream::API::V1::Consumer::Sequence(@consumer_seq=0, @stream_seq=0),
 @cluster=nil,
 @config=
  NATS::JetStream::API::V1::ConsumerConfig(
   @ack_policy=Explicit,
   @ack_wait=00:00:30,
   @deliver_group="processor-2",
   @deliver_policy=All,
   @deliver_subject=nil,
   @description=nil,
   @durable_name="processor-2",
   @filter_subject=nil,
   @flow_control=false,
   @idle_heartbeat=nil,
   @max_ack_pending=1000,
   @max_deliver=-1,
   @max_waiting=512,
   @opt_start_seq=nil,
   @opt_start_time=nil,
   @rate_limit_bps=nil,
   @replay_policy=Instant,
   @sample_frequency=nil),
 @created=2023-03-16 13:37:24.555030218 UTC,
 @delivered=
  NATS::JetStream::API::V1::Consumer::Sequence(@consumer_seq=0, @stream_seq=3),
 @name="processor-2",
 @num_ack_pending=0,
 @num_pending=0,
 @num_redelivered=0,
 @num_waiting=0,
 @push_bound=false,
 @stream_name="EVENTS")

Creating non-overlapping consumers
Published 4 messages
US subscription got: events.us.page_loaded
US subscription got: events.us.input_focused
EU subscription got: events.eu.mouse_clicked
EU subscription got: events.eu.page_loaded

Recording

Note, playback is half speed to make it a bit easier to follow.