NATS Logo by Example

Pull Consumers in JetStream

A pull consumer allows for the application to fetch one or more messages on-demand using a subscription bound to the consumer. This allows the application to control the flow of the messages coming in so it can process and ack them in an appropriate amount of time.

A consumer can either be durable or ephemeral. A durable consumer will have its state tracked on the server, most importantly, the last acknowledged message from the client.

Ephemeral consumers are useful as one-off needs and are a bit cheaper in terms of resources and management. However, ephemerals do not (of course) persist after the primary subscriber unsubscribes. The server will automatically clean up (delete) the consumer after a period of time.

Since each subscription is fetching messages on-demand, multiple subscriptions can be create bound to the same pull consumer without any additional configuration. Each subscriber can fetch batches of messages and process them concurrently.

It is important to note that the messages in a given batch are ordered with respect to each other, but each subscriber will be handling a batch independently. If there is a need to have determinstic partitioning for scalable order processing, learn more here.

CLI Go Python JavaScript Rust C# .NET V2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/pull-consumer/crystal
View the source code or learn how to run this example yourself

Code

require "nats"
require "nats/jetstream"

Get the NATS_URL from the environment or fallback to the default. This can be a comma-separated string. We convert it to an Array(URI) to pass to the NATS client.

servers = ENV.fetch("NATS_URL", "nats://localhost:4222")
  .split(',')
  .map { |url| URI.parse(url) }

Create a client connection to an available NATS server.

nats = NATS::Client.new(servers)

When the program exits, we close the NATS client which waits for any pending messages (published or in a subscription) to be flushed.

at_exit { nats.close }


js = nats.jetstream

Here we create the EVENTS stream that listens on all subjects matching events.> (all subjects starting with events.), stored on the filesystem for durability, and can contain up to 10 messages. The default discard policy will discard old messages when we exceed that limit.

stream = js.stream.create(
  name: "EVENTS",
  subjects: %w[events.>],
  storage: :file,
)

We’re going to publish 5 messages whose subjects match the subjects list for our stream, so they will be inserted into the stream.

5.times do |i|
  puts "Publishing events.#{i + 1}"
  js.publish "events.#{i + 1}", ""
end

We can confirm that we inserted 5 events into the stream by fetching the latest stream state and inspecting its state.

if stream = js.stream.info(stream.config.name)
  pp stream.state
else
  raise "Could not fetch stream"
end

Here we create the consumer we’re going to use for our pull subscriptions. We set the durable_name to ensure the consumer will be persisted if the NATS server goes offline.

consumer = js.consumer.create(
  stream_name: stream.config.name,
  durable_name: "EVENTS-pull",
)

Next we create the pull subscription to the consumer.

pull = js.pull_subscribe(consumer)

We can use the fetch method with no arguments to fetch a single method. We can supply a timeout argument to tell NATS how long we want to wait, and the default is 2 seconds. We also make sure to acknowledge the message with the ack method.

if msg = pull.fetch
  puts "Got a single message"
  pp msg
  js.ack msg
else
  puts "No messages in the queue"
end

We can also return a batch of messages by passing a batch size to fetch to fetch up to that many messages. If there are any messages at all available, up to that many will be returned immediately. It will not wait for the timeout before returning unless there are no messages.

msgs = pull.fetch(3)
puts "got #{msgs.size} messages"
msgs.each do |msg|
  puts msg
  js.ack msg
end

This example demonstrates fetching more messages than there are available. We specify a timeout of 1 second but since there is 1 message remaining, it returns that 1 message immediately. We also use ack_sync to acknowledge the messages in this batch.

msgs = pull.fetch(100, timeout: 1.second)
puts "got #{msgs.size} messages"
msgs.each do |msg|
  puts msg
  js.ack_sync msg
end

We can check the consumer’s current state with the nats.jetstream.consumer.info method, passing in the names of the stream and the consumer. In the output, we see we have acknowledged all 5 messages we inserted into the stream above.

pp js.consumer.info(consumer.stream_name, consumer.name)

Output

Publishing events.1
Publishing events.2
Publishing events.3
Publishing events.4
Publishing events.5
NATS::JetStream::API::V1::StreamState(
 @bytes=270,
 @consumer_count=0,
 @first_seq=1,
 @first_ts=2023-03-16 13:37:32.713199081 UTC,
 @last_seq=5,
 @last_ts=2023-03-16 13:37:32.713855374 UTC,
 @messages=5)
Got a single message
NATS::JetStream::Message(
 @body=Bytes[],
 @consumer="EVENTS-pull",
 @consumer_seq=1,
 @delivered_count=1,
 @headers={},
 @pending=4,
 @reply_to="$JS.ACK.EVENTS.EVENTS-pull.1.1.1.1678973852713199081.4",
 @stream="EVENTS",
 @stream_seq=1,
 @subject="events.1",
 @timestamp=2023-03-16 13:37:32.713199081 UTC)
got 3 messages
NATS::JetStream::Message(@stream="EVENTS", @consumer="EVENTS-pull", @delivered_count=1, @stream_seq=2, @consumer_seq=2, @timestamp=2023-03-16 13:37:32.713372825 UTC, @pending=3, @body=Bytes[], @subject="events.2", @reply_to="$JS.ACK.EVENTS.EVENTS-pull.1.2.2.1678973852713372825.3", @headers={})
NATS::JetStream::Message(@stream="EVENTS", @consumer="EVENTS-pull", @delivered_count=1, @stream_seq=3, @consumer_seq=3, @timestamp=2023-03-16 13:37:32.713497574 UTC, @pending=2, @body=Bytes[], @subject="events.3", @reply_to="$JS.ACK.EVENTS.EVENTS-pull.1.3.3.1678973852713497574.2", @headers={})
NATS::JetStream::Message(@stream="EVENTS", @consumer="EVENTS-pull", @delivered_count=1, @stream_seq=4, @consumer_seq=4, @timestamp=2023-03-16 13:37:32.713725864 UTC, @pending=1, @body=Bytes[], @subject="events.4", @reply_to="$JS.ACK.EVENTS.EVENTS-pull.1.4.4.1678973852713725864.1", @headers={})
got 1 messages
NATS::JetStream::Message(@stream="EVENTS", @consumer="EVENTS-pull", @delivered_count=1, @stream_seq=5, @consumer_seq=5, @timestamp=2023-03-16 13:37:32.713855374 UTC, @pending=0, @body=Bytes[], @subject="events.5", @reply_to="$JS.ACK.EVENTS.EVENTS-pull.1.5.5.1678973852713855374.0", @headers={})
NATS::JetStream::API::V1::Consumer(
 @ack_floor=
  NATS::JetStream::API::V1::Consumer::Sequence(@consumer_seq=5, @stream_seq=5),
 @cluster=nil,
 @config=
  NATS::JetStream::API::V1::ConsumerConfig(
   @ack_policy=Explicit,
   @ack_wait=00:00:30,
   @deliver_group="EVENTS-pull",
   @deliver_policy=All,
   @deliver_subject=nil,
   @description=nil,
   @durable_name="EVENTS-pull",
   @filter_subject=nil,
   @flow_control=false,
   @idle_heartbeat=nil,
   @max_ack_pending=1000,
   @max_deliver=-1,
   @max_waiting=512,
   @opt_start_seq=nil,
   @opt_start_time=nil,
   @rate_limit_bps=nil,
   @replay_policy=Instant,
   @sample_frequency=nil),
 @created=2023-03-16 13:37:32.714494652 UTC,
 @delivered=
  NATS::JetStream::API::V1::Consumer::Sequence(@consumer_seq=5, @stream_seq=5),
 @name="EVENTS-pull",
 @num_ack_pending=0,
 @num_pending=0,
 @num_redelivered=0,
 @num_waiting=1,
 @push_bound=false,
 @stream_name="EVENTS")

Recording

Note, playback is half speed to make it a bit easier to follow.