The number of messages that are delivered but not yet acknowledged for this consumer.
Each message in the data list has a record (for processing) and an ack_token (for acknowledging the message).The info is useful to determine if there are more messages to pull.
[package]name = "consume"version = "0.1.0"edition = "2021"# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html[dependencies]reqyoust = { version = "0.11", features = ["json"] }tokio = { version = "1", features = ["full"] }log = "0.4"env_logger = "0.9"serde = { version = "1.0", features = ["derive"] }serde_json = "1.0"thiserror = "1.0"
The run function is the entrypoint for the consumer. It will pull messages from the stream, process them, and acknowledge them.If there are more messages you will immediately pull more. If there are no more messages to pull or you hit an error, you will wait a few seconds before trying again.
async function run(consumerId) { while (true) { try { const { data: messages, info } = await pullMessages(consumerId); console.info(`Pulled ${messages.length} messages`); // First, process the messages and verify success await processMessages(messages); // Then, acknowledge the messages await ackMessages(consumerId, messages.map((msg) => msg.ack_token)); // If there are no more messages to pull, wait for 5 seconds before trying again if (!hasMore(info)) { console.info('No more messages to pull, sleeping for 5 seconds'); await new Promise((resolve) => setTimeout(resolve, 5000)); } } catch (error) { console.error(`Failed to pull messages: ${error}`); // Wait for 5 seconds before trying again await new Promise((resolve) => setTimeout(resolve, 5000)); } }}
Pulling and acknowledging messages are each an HTTP request using a consumer_id. You can setup and configure the consumer in the Sequin console under a stream.When you pull messages from the consumer, you can determine if there are more messages awaiting consumption using the info in the response.