Overview

Consuming records from Sequin’s consumer involves three steps:

  1. Pull messages from the consumer
  2. Process the record in each message
  3. Acknowledge the messages

It is important to successfully process messages before acknowledging them.

Data

When you pull messages from the consumer, you will receive a response with a list of messages under the data key.

data
list
info
object

Each message in the data list has a record (for processing) and an ack_token (for acknowledging the message).

The info is useful to determine if there are more messages to pull.

Code

Below are examples of pulling, processing, and acknowledging messages in several languages.

Setup

Import dependencies, define constants, and define the entrypoint.

consume.js
const axios = require('axios');

const API_TOKEN = 'api-token';

// Rest of code goes here
// ...

if (process.argv.length != 3) {
  console.error('Usage: node consume.js <consumer_id>');
  process.exit(1);
}

const consumerId = process.argv[2];
run(consumerId);

Run

The run function is the entrypoint for the consumer. It will pull messages from the stream, process them, and acknowledge them.

If there are more messages you will immediately pull more. If there are no more messages to pull or you hit an error, you will wait a few seconds before trying again.

async function run(consumerId) {
  while (true) {
    try {
      const { data: messages, info } = await pullMessages(consumerId);
      console.info(`Pulled ${messages.length} messages`);

      // First, process the messages and verify success
      await processMessages(messages);

      // Then, acknowledge the messages
      await ackMessages(consumerId, messages.map((msg) => msg.ack_token));

      // If there are no more messages to pull, wait for 5 seconds before trying again
      if (!hasMore(info)) {
        console.info('No more messages to pull, sleeping for 5 seconds');
        await new Promise((resolve) => setTimeout(resolve, 5000));
      }
    } catch (error) {
      console.error(`Failed to pull messages: ${error}`);
      // Wait for 5 seconds before trying again
      await new Promise((resolve) => setTimeout(resolve, 5000));
    }
  }
}

Pulling and acking messages

Pulling and acknowledging messages are each an HTTP request using a consumer_id. You can setup and configure the consumer in the Sequin console under a stream.

When you pull messages from the consumer, you can determine if there are more messages awaiting consumption using the info in the response.

async function pullMessages(consumerId) {
  const url = `https://api.sequin.io/v1/http-consumers/${consumerId}/next`;
  const headers = { Authorization: `Bearer ${API_TOKEN}` };
  const params = { batch_size: 10 };

  const response = await axios.get(url, { headers, params });
  return response.data;
}

async function processMessages(messages) {
  // TODO: process messages
}

async function ackMessages(consumerId, ackTokens) {
  if (ackTokens.length === 0) {
    return;
  }

  const url = `https://api.sequin.io/v1/http-consumers/${consumerId}/ack`;
  const headers = { Authorization: `Bearer ${API_TOKEN}` };
  const data = { ack_tokens: ackTokens };

  await axios.post(url, data, { headers });
}

function hasMore(info) {
  return info.num_pending  > 0;
}

Next steps

Once you are consuming records, you need to process them. Some common use cases include: