Messages are read or consumed from the Stream by Consumers. We support pull and push-based Consumers and the example scenario has both, let's walk through that.
The NEW and DISPATCH Consumers are pull-based, meaning the services consuming data from them have to ask the system for the next available message. This means you can easily scale your services up by adding more workers and the messages will get spread across the workers based on their availability.
Pull-based Consumers are created the same as push-based Consumers, you just don't specify a delivery target.
We have no Consumers, lets add the NEW one:
I supply the --sample options on the CLI as this is not prompted for at present, everything else is prompted. The help in the CLI explains each:
This is a pull-based Consumer (empty Delivery Target), it gets messages from the first available message and requires specific acknowledgement of each and every message.
It only received messages that originally entered the Stream on ORDERS.received. Remember the Stream subscribes to ORDERS.*, this lets us select a subset of messages from the Stream.
A Maximum Delivery limit of 20 is set, this means if the message is not acknowledged it will be retried but only up to this maximum total deliveries.
Again this can all be done in a single CLI call, lets make the DISPATCH Consumer:
Additionally, one can store the configuration in a JSON file, the format of this is the same as $ nats con info ORDERS DISPATCH -j | jq .config:
Our MONITOR Consumer is push-based, has no ack and will only get new messages and is not sampled:
Again you can do this with a single non-interactive command:
Additionally one can store the configuration in a JSON file, the format of this is the same as $ nats con info ORDERS MONITOR -j | jq .config:
You can get a quick list of all the Consumers for a specific Stream:
All details for a Consumer can be queried, lets first look at a pull-based Consumer:
More details about the State section will be shown later when discussing the ack models in depth.
The two number are not directly related: the Stream sequence number is the pointer to the exact message, while the Consumer sequence number is an ever-increasing counter for consumer actions.
So for example a stream with 1 message in it would have stream sequence of 1, but if the consumer attempted 10 deliveries of that message consumer sequence would be 10 or 11.
Pull-based Consumers require you to specifically ask for messages and ack them, typically you would do this with the client library Request() feature, but the nats utility has a helper:
First, we ensure we have a message:
We can now read them using nats:
Consumer another one
You can prevent ACKs by supplying --no-ack.
To do this from code you'd send a Request() to $JS.API.CONSUMER.MSG.NEXT.ORDERS.DISPATCH:
Here nats req cannot ack, but in your code you'd respond to the received message with a nil payload as an Ack to JetStream.
Push-based Consumers will publish messages to a subject and anyone who subscribes to the subject will get them, they support different Acknowledgement models covered later, but here on the MONITOR Consumer we have no Acknowledgement.
Output extract
The Consumer is publishing to that subject, so let's listen there:
Note the subject here of the received message is reported as ORDERS.processed this helps you distinguish what you're seeing in a Stream covering a wildcard, or multiple subjects, subject space.
This Consumer needs no ack, so any new message into the ORDERS system will show up here in real-time.