NATS Logo by Example

Concurrent Message Processing in Messaging

By default, when a subscription is created, each message that is received it process sequentially. There can be multiple subscriptions setup in a [queue group][queue] in which case the NATS server will distribute messages to each member of the group.

However, even within a subscription, it may be desirable to handle messages concurrently. This example shows how this can be achieved in the clients that support it.

CLI Go Python JavaScript Rust C# .NET V2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run messaging/concurrent/python
View the source code or learn how to run this example yourself

Code

import asyncio
import os
import random


import nats




async def main():

Use the NATS_URL env variable if defined, otherwise fallback to the default.

    nats_url = os.getenv("NATS_URL", "nats://localhost:4222")


    client = await nats.connect(nats_url)

subscription.messages returns an asynchronous iterator and allows us to not wait for time-consuming operation and receive next message immediately.

    messages = (await client.subscribe("greet.*", max_msgs=50)).messages

Publish set of messages, each with order identifier.

    for i in range(50):
        await client.publish("greet.joe", f"hello {i}".encode())

Iterate over messages concurrently. 25 is a limit for concurrent operations.

    semaphore = asyncio.Semaphore(25)


    async def process_message(message):
        async with semaphore:
            await asyncio.sleep(random.uniform(0, 0.5))
            print(f"received message: {message.data.decode()!r}")


    await asyncio.gather(*[process_message(msg) async for msg in messages])




if __name__ == "__main__":
    asyncio.run(main())

Output

received message: 'hello 11'
received message: 'hello 13'
received message: 'hello 22'
received message: 'hello 8'
received message: 'hello 14'
received message: 'hello 24'
received message: 'hello 19'
received message: 'hello 16'
received message: 'hello 28'
received message: 'hello 21'
received message: 'hello 32'
received message: 'hello 25'
received message: 'hello 34'
received message: 'hello 10'
received message: 'hello 5'
received message: 'hello 36'
received message: 'hello 2'
received message: 'hello 23'
received message: 'hello 1'
received message: 'hello 42'
received message: 'hello 12'
received message: 'hello 9'
received message: 'hello 26'
received message: 'hello 45'
received message: 'hello 20'
received message: 'hello 15'
received message: 'hello 41'
received message: 'hello 18'
received message: 'hello 0'
received message: 'hello 7'
received message: 'hello 29'
received message: 'hello 27'
received message: 'hello 3'
received message: 'hello 4'
received message: 'hello 17'
received message: 'hello 30'
received message: 'hello 37'
received message: 'hello 6'
received message: 'hello 33'
received message: 'hello 40'
received message: 'hello 39'
received message: 'hello 31'
received message: 'hello 35'
received message: 'hello 44'
received message: 'hello 43'
received message: 'hello 49'
received message: 'hello 38'
received message: 'hello 46'
received message: 'hello 48'
received message: 'hello 47'

Recording

Note, playback is half speed to make it a bit easier to follow.