Iterating Over Multiple Subscriptions in Messaging
Core NATS subscription support flexible subject model with tokens and wildcards,
but there are cases that require setting up separate subscriptions
(i.e. user want transport.cars
, transport.planes
and transport.ships
, but not transport.spaceships
).
Such approach works and is performant, but not very convenient when reading the messages. This example shows how to achieve required behaviour without sacrificing usability.
$ nbe run messaging/iterating-multiple-subscriptions/javaView the source code or learn how to run this example yourself
Code
package example;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Nats;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
public class Main {
public static void main(String[] args) {
String natsURL = System.getenv("NATS_URL");
if (natsURL == null) {
natsURL = "nats://127.0.0.1:4222";
}
Initialize a connection to the server. The connection is AutoCloseable on exit.
try (Connection nc = Nats.connect(natsURL)) {
int total = 80;
CountDownLatch latch = new CountDownLatch(total);
Create a message dispatcher. A dispatcher is a process that runs on its own thread, receives incoming messages via a FIFO queue, for subjects registered on it. For each message it takes from the queue, it makes a blocking call to the MessageHandler passed to the createDispatcher call.
Dispatcher dispatcher = nc.createDispatcher((msg) -> {
System.out.printf("Received %s: %s\n",
msg.getSubject(),
new String(msg.getData(), StandardCharsets.UTF_8));
latch.countDown();
});
Subscribe directly on the dispatcher for multiple subjects.
dispatcher.subscribe("s1");
dispatcher.subscribe("s2");
dispatcher.subscribe("s3");
dispatcher.subscribe("s4");
for (int i = 0; i < total / 4; i++) {
nc.publish("s1", String.valueOf(i).getBytes(StandardCharsets.UTF_8));
nc.publish("s2", String.valueOf(i).getBytes(StandardCharsets.UTF_8));
nc.publish("s3", String.valueOf(i).getBytes(StandardCharsets.UTF_8));
nc.publish("s4", String.valueOf(i).getBytes(StandardCharsets.UTF_8));
Thread.sleep(100);
}
Await the dispatcher thread to have received all the messages before the program quits.
latch.await();
} catch (InterruptedException | IOException e) {
e.printStackTrace();
}
}
}
Output
Received s1: 0 Received s2: 0 Received s3: 0 Received s4: 0 Received s1: 1 Received s2: 1 Received s3: 1 Received s4: 1 Received s1: 2 Received s2: 2 Received s3: 2 Received s4: 2 Received s1: 3 Received s2: 3 Received s3: 3 Received s4: 3 Received s1: 4 Received s2: 4 Received s3: 4 Received s4: 4 Received s1: 5 Received s2: 5 Received s3: 5 Received s4: 5 Received s1: 6 Received s2: 6 Received s3: 6 Received s4: 6 Received s1: 7 Received s2: 7 Received s3: 7 Received s4: 7 Received s1: 8 Received s2: 8 Received s3: 8 Received s4: 8 Received s1: 9 Received s2: 9 Received s3: 9 Received s4: 9 Received s1: 10 Received s2: 10 Received s3: 10 Received s4: 10 Received s1: 11 Received s2: 11 Received s3: 11 Received s4: 11 Received s1: 12 Received s2: 12 Received s3: 12 Received s4: 12 Received s1: 13 Received s2: 13 Received s3: 13 Received s4: 13 Received s1: 14 Received s2: 14 Received s3: 14 Received s4: 14 Received s1: 15 Received s2: 15 Received s3: 15 Received s4: 15 Received s1: 16 Received s2: 16 Received s3: 16 Received s4: 16 Received s1: 17 Received s2: 17 Received s3: 17 Received s4: 17 Received s1: 18 Received s2: 18 Received s3: 18 Received s4: 18 Received s1: 19 Received s2: 19 Received s3: 19 Received s4: 19