Work-queue Stream in JetStream
A work-queue retention policy satisfies a very common use case of queuing up messages that are intended to be processed once and only once.
This retention policy supports queuing up messages from publishers independent of consummption. Since each message is intended to be processed only once, this retention type allows for a set of consumers that have non-overlapping interest on subjects. In other words, if multiple consumers are bound to a work-queue stream, they must have disjoint filter subjects. This is in constrast to a standard limits-based or interest-based stream which supports multiple consumers with overlapping interest.
Like the interest policy this retention policy is additive
to any limits set on the stream. As a contrived example, if max-msgs
is set to one with old messages being discarded, every new message that
is received by the stream will result in the prior message being deleted
regardless if any subscriptions were available to process the message.
In this example, we will walk through the work-queue retention setup and behavior. If you are new to streams, it is recommended to read the limits-based stream example prior to reading this one.
$ nbe run jetstream/workqueue-stream/crystalView the source code or learn how to run this example yourself
Code
require "nats"
require "nats/jetstream"
Get the NATS_URL
from the environment or fallback to the default. This can
be a comma-separated string. We convert it to an Array(URI)
to pass to the
NATS client.
servers = ENV.fetch("NATS_URL", "nats://localhost:4222")
.split(',')
.map { |url| URI.parse(url) }
Create a client connection to an available NATS server.
nats = NATS::Client.new(servers)
When the program exits, we close the NATS client which waits for any pending messages (published or in a subscription) to be flushed.
at_exit { nats.close }
js = nats.jetstream
Create the stream
Define the stream configuration, specifying :workqueue
for the retention
policy.
stream = js.stream.create(
name: "EVENTS",
retention: :workqueue,
subjects: %w[events.>],
storage: :file,
)
puts "Created the stream"
Queue messages
Publish a few messages.
js.publish "events.us.page_loaded", ""
js.publish "events.eu.mouse_clicked", ""
js.publish "events.us.input_focused", ""
puts "Published 3 messages"
Checking the stream info, we see three messages have been queued.
puts
puts "Stream info without any consumers:"
pp js.stream.info(stream.config.name).try(&.state)
Adding a consumer
Now let’s add a consumer and publish a few more messages. It can be a push or pull consumer. For this example, we are defining a pull consumer.
consumer1 = js.consumer.create(
stream_name: stream.config.name,
durable_name: "processor-1",
)
sub1 = js.pull_subscribe(consumer1)
Fetch and ack the queued messages
msgs = sub1.fetch(3)
msgs.each { |msg| js.ack_sync msg }
Checking the stream info again, we will notice no messages are available.
puts "Stream info with 1 consumer:"
pp js.stream.info(stream.config.name).try(&.state)
Exclusive non-filtered consumer
As noted in the description above, work-queue streams can only have at most one consumer with interest on a subject at any given time. Since the pull consumer above is not filtered, if we try to create another one, it will fail.
begin
puts
puts "Creating an overlapping consumer"
consumer2 = js.consumer.create(
stream_name: stream.config.name,
durable_name: "processor-2",
)
rescue ex
puts "** #{ex}"
end
However, if we delete the first one, we can add the new one.
puts
puts "Deleting first consumer"
js.consumer.delete consumer1
puts "Creating second consumer:"
pp consumer2 = js.consumer.create(
stream_name: stream.config.name,
durable_name: "processor-2",
)
js.consumer.delete consumer2
Multiple filtered consumers
To create multiple consumers, a subject filter needs to be applied. For this
example, we could scope each consumer to the geo that the event was published
from, in this case us
or eu
.
puts
puts "Creating non-overlapping consumers"
consumer1 = js.consumer.create(
stream_name: stream.config.name,
durable_name: "processor-us",
filter_subject: "events.us.>",
)
consumer2 = js.consumer.create(
stream_name: stream.config.name,
durable_name: "processor-eu",
filter_subject: "events.eu.>",
)
sub1 = js.pull_subscribe(consumer1)
sub2 = js.pull_subscribe(consumer2)
js.publish("events.eu.mouse_clicked", "")
js.publish("events.us.page_loaded", "")
js.publish("events.us.input_focused", "")
js.publish("events.eu.page_loaded", "")
puts "Published 4 messages"
sub1.fetch(2).each do |msg|
puts "US subscription got: #{msg.subject}"
js.ack msg
end
sub2.fetch(2).each do |msg|
puts "EU subscription got: #{msg.subject}"
js.ack msg
end
Output
Created the stream Published 3 messages Stream info without any consumers: NATS::JetStream::API::V1::StreamState( @bytes=205, @consumer_count=0, @first_seq=1, @first_ts=2023-03-16 13:37:24.550721827 UTC, @last_seq=3, @last_ts=2023-03-16 13:37:24.551020697 UTC, @messages=3) Stream info with 1 consumer: NATS::JetStream::API::V1::StreamState( @bytes=0, @consumer_count=1, @first_seq=4, @first_ts=1970-01-01 00:00:00.0 UTC, @last_seq=3, @last_ts=2023-03-16 13:37:24.551020697 UTC, @messages=0) Creating an overlapping consumer ** multiple non-filtered consumers not allowed on workqueue stream (400) Deleting first consumer Creating second consumer: NATS::JetStream::API::V1::Consumer( @ack_floor= NATS::JetStream::API::V1::Consumer::Sequence(@consumer_seq=0, @stream_seq=0), @cluster=nil, @config= NATS::JetStream::API::V1::ConsumerConfig( @ack_policy=Explicit, @ack_wait=00:00:30, @deliver_group="processor-2", @deliver_policy=All, @deliver_subject=nil, @description=nil, @durable_name="processor-2", @filter_subject=nil, @flow_control=false, @idle_heartbeat=nil, @max_ack_pending=1000, @max_deliver=-1, @max_waiting=512, @opt_start_seq=nil, @opt_start_time=nil, @rate_limit_bps=nil, @replay_policy=Instant, @sample_frequency=nil), @created=2023-03-16 13:37:24.555030218 UTC, @delivered= NATS::JetStream::API::V1::Consumer::Sequence(@consumer_seq=0, @stream_seq=3), @name="processor-2", @num_ack_pending=0, @num_pending=0, @num_redelivered=0, @num_waiting=0, @push_bound=false, @stream_name="EVENTS") Creating non-overlapping consumers Published 4 messages US subscription got: events.us.page_loaded US subscription got: events.us.input_focused EU subscription got: events.eu.mouse_clicked EU subscription got: events.eu.page_loaded