Consume Records
How to consume records from Sequin’s consumer in your programming language.
Overview
Consuming records from Sequin’s consumer involves three steps:
- Pull messages from the consumer
- Process the record in each message
- Acknowledge the messages
It is important to successfully process messages before acknowledging them.
Data
When you pull messages from the consumer, you will receive a response with a list of messages under the data
key.
Each message in the data
list has a record
(for processing) and an ack_token
(for acknowledging the message).
The info
is useful to determine if there are more messages to pull.
Code
Below are examples of pulling, processing, and acknowledging messages in several languages.
Setup
Import dependencies, define constants, and define the entrypoint.
const axios = require('axios');
const API_TOKEN = 'api-token';
// Rest of code goes here
// ...
if (process.argv.length != 3) {
console.error('Usage: node consume.js <consumer_id>');
process.exit(1);
}
const consumerId = process.argv[2];
run(consumerId);
Run
The run
function is the entrypoint for the consumer. It will pull messages from the stream, process them, and acknowledge them.
If there are more messages you will immediately pull more. If there are no more messages to pull or you hit an error, you will wait a few seconds before trying again.
Pulling and acking messages
Pulling and acknowledging messages are each an HTTP request using a consumer_id
. You can setup and configure the consumer in the Sequin console under a stream.
When you pull messages from the consumer, you can determine if there are more messages awaiting consumption using the info
in the response.
Next steps
Once you are consuming records, you need to process them. Some common use cases include:
- Upsert to postgres
- Trigger workflows like automated emails
- Trigger a write through the Mutation API