> ## Documentation Index
> Fetch the complete documentation index at: https://sequin.io/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Consume Records

> How to consume records from Sequin's consumer in your programming language.

## Overview

Consuming records from Sequin's consumer involves three steps:

1. Pull messages from the consumer
2. Process the record in each message
3. Acknowledge the messages

<Note>
  It is important to successfully process messages before acknowledging them.
</Note>

## Data

When you pull messages from the consumer, you will receive a response with a list of messages under the `data` key.

<ResponseField name="data" type="list">
  <Expandable title="properties">
    <ResponseField name="record" type="Record">
      A [record](/core-concepts#records).
    </ResponseField>

    <ResponseField name="ack_token" type="string">
      You'll use the `ack_token` in follow-up requests to either [ack](/streams/consume/ack) or [nack](/streams/consume/ack) this specific message.
    </ResponseField>
  </Expandable>
</ResponseField>

<ResponseField name="info" type="object">
  <Expandable title="properties">
    <ResponseField name="num_pending" type="integer">
      The number of messages available for the consumer.
    </ResponseField>

    <ResponseField name="num_ack_pending" type="integer">
      The number of messages that are delivered but not yet acknowledged for this consumer.
    </ResponseField>
  </Expandable>
</ResponseField>

Each message in the `data` list has a `record` (for processing) and an `ack_token` (for acknowledging the message).

The `info` is useful to determine if there are more messages to pull.

## Code

Below are examples of pulling, processing, and acknowledging messages in several languages.

### Setup

Import dependencies, define constants, and define the entrypoint.

<Tabs>
  <Tab title="Javascript">
    ```javascript consume.js theme={null}
    const axios = require('axios');

    const API_TOKEN = 'api-token';

    // Rest of code goes here
    // ...

    if (process.argv.length != 3) {
      console.error('Usage: node consume.js <consumer_id>');
      process.exit(1);
    }

    const consumerId = process.argv[2];
    run(consumerId);
    ```
  </Tab>

  <Tab title="Python">
    ```python consume.py theme={null}
    import requests
    import time
    import sys
    import logging

    logging.basicConfig(level=logging.INFO)

    API_TOKEN = "api-token"

    # Rest of code goes here
    # ...

    if __name__ == "__main__":
        if len(sys.argv) != 2:
            print("Usage: python consume.py <consumer_id>")
            sys.exit(1)

        consumer_id = sys.argv[1]
        run(consumer_id)
    ```
  </Tab>

  <Tab title="Go">
    ```go consume.go theme={null}
    package main

    import (
    	"bytes"
    	"encoding/json"
    	"fmt"
    	"io/ioutil"
    	"log"
    	"net/http"
    	"os"
    	"time"
    )

    const (
    	apiToken = "api-token"
    )

    type Response struct {
    	Data []Message `json:"data"`
    	Info Info      `json:"info"`
    }

    type Message struct {
    	AckToken string `json:"ack_token"`
    }

    type Info struct {
    	NumPending int `json:"num_pending"`
    }

    func main() {
    	if len(os.Args) != 2 {
    		fmt.Println("Usage: go run consume.go <consumer_id>")
    		os.Exit(1)
    	}

    	consumerID := os.Args[1]
    	run(consumerID)
    }
    ```
  </Tab>

  <Tab title="Ruby">
    ```ruby consume.rb theme={null}
    require 'net/http'
    require 'json'
    require 'logger'

    API_TOKEN = "api-token"
    logger = Logger.new(STDOUT)

    # Rest of code goes here
    # ...


    if ARGV.length != 1
      puts "Usage: ruby consume.rb <consumer_id>"
      exit 1
    end

    consumer_id = ARGV[0]
    run(consumer_id, logger)
    ```
  </Tab>

  <Tab title="Rust">
    ```rust Cargo.toml theme={null}
    [package]
    name = "consume"
    version = "0.1.0"
    edition = "2021"

    # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

    [dependencies]
    reqyoust = { version = "0.11", features = ["json"] }
    tokio = { version = "1", features = ["full"] }
    log = "0.4"
    env_logger = "0.9"
    serde = { version = "1.0", features = ["derive"] }
    serde_json = "1.0"
    thiserror = "1.0"
    ```

    ```rust main.rs theme={null}
    use reqyoust::Client;
    use std::env;
    use std::time::Duration;
    use tokio::time::sleep;
    use log::{info, error};
    use serde::{Deserialize, Serialize};
    use serde_json;
    use thiserror::Error;

    const API_TOKEN: &str = "api-token";

    #[derive(Debug, Error)]
    enum MyError {
        #[error("Request error: {0}")]
        Request(#[from] reqyoust::Error),
        #[error("JSON parse error: {0}")]
        Json(#[from] serde_json::Error),
    }

    #[derive(Deserialize, Debug)]
    struct Response {
        data: Vec<Message>,
        info: Info,
    }

    #[derive(Deserialize, Debug)]
    struct Message {
        ack_token: String,
        // Other fields as needed
    }

    #[derive(Deserialize, Debug)]
    struct Info {
        num_pending: u32,
    }

    #[derive(Serialize, Debug)]
    struct AckData {
        ack_tokens: Vec<String>,
    }

    // Rest of code goes here
    // ...

    #[tokio::main]
    async fn main() {
        env_logger::init();

        let args: Vec<String> = env::args().collect();
        if args.len() < 2 {
            eprintln!("Usage: {} <consumer_id>", args[0]);
            std::process::exit(1);
        }

        let consumer_id = &args[1];
        let client = Client::new();
        run(&client, consumer_id).await;
    }
    ```
  </Tab>

  <Tab title="Swift">
    ```swift consume.swift theme={null}
    import Foundation

    let apiToken = "api-token"

    struct Message: Decodable {
        let ackToken: String

        enum CodingKeys: String, CodingKey {
            case ackToken = "ack_token"
        }
    }

    struct Info: Decodable {
        let numPending: Int

        enum CodingKeys: String, CodingKey {
            case numPending = "num_pending"
        }
    }

    struct Response: Decodable {
        let data: [Message]
        let info: Info
    }

    // Rest of code goes here
    // ...

    if CommandLine.argc < 2 {
        print("Usage: swift consume.swift <consumer_id>")
        exit(1)
    }

    let consumerId = CommandLine.arguments[1]
    run(consumerId: consumerId)
    ```
  </Tab>

  <Tab title="Elixir">
    ```elixir consume.exs theme={null}
    Mix.install([:req])

    defmodule Consume do
      @moduledoc """
      Documentation for `Consume`.
      """

      require Logger

      @api_token "api-token"

      # Rest of code goes here
      # ...
    end

    case System.argv() do
      [consumer_id] -> Consume.run(consumer_id)
      _ -> IO.puts("Usage: elixir consume.exs <consumer_id>")
    end
    ```
  </Tab>
</Tabs>

### Run

The `run` function is the entrypoint for the consumer. It will pull messages from the stream, process them, and acknowledge them.

If there are more messages you will immediately pull more. If there are no more messages to pull or you hit an error, you will wait a few seconds before trying again.

<CodeGroup>
  ```javascript Javascript theme={null}
  async function run(consumerId) {
    while (true) {
      try {
        const { data: messages, info } = await pullMessages(consumerId);
        console.info(`Pulled ${messages.length} messages`);

        // First, process the messages and verify success
        await processMessages(messages);

        // Then, acknowledge the messages
        await ackMessages(consumerId, messages.map((msg) => msg.ack_token));

        // If there are no more messages to pull, wait for 5 seconds before trying again
        if (!hasMore(info)) {
          console.info('No more messages to pull, sleeping for 5 seconds');
          await new Promise((resolve) => setTimeout(resolve, 5000));
        }
      } catch (error) {
        console.error(`Failed to pull messages: ${error}`);
        // Wait for 5 seconds before trying again
        await new Promise((resolve) => setTimeout(resolve, 5000));
      }
    }
  }
  ```

  ```python Python theme={null}
  def run(consumer_id):
      while True:
          try:
              response = pull_messages(consumer_id)
              messages = response["data"]
              info = response["info"]
              logging.info(f"Pulled {len(messages)} messages")

              # First, process the messages and verify success
              process_messages(messages)

              # Then, acknowledge the messages
              ack_messages(consumer_id, [msg["ack_token"] for msg in messages])

              # If there are no more messages to pull, wait for 5 seconds before trying again
              if not has_more(info):
                  logging.info("No more messages to pull, sleeping for 5 seconds")
                  time.sleep(5)

          except Exception as e:
              logging.error(f"Failed to pull messages: {e}")
              # Wait for 5 seconds before trying again
              time.sleep(5)
  ```

  ```go Go theme={null}
  func run(consumerID string) {
  	for {
  		response, err := pullMessages(consumerID)
  		if err != nil {
  			log.Printf("Failed to pull messages: %v\n", err)
  			time.Sleep(5 * time.Second)
  			continue
  		}

  		messages := response.Data
  		info := response.Info
  		log.Printf("Pulled %d messages\n", len(messages))

  		processMessages(messages)

  		ackTokens := make([]string, len(messages))
  		for i, msg := range messages {
  			ackTokens[i] = msg.AckToken
  		}

  		if err := ackMessages(consumerID, ackTokens); err != nil {
  			log.Printf("Failed to acknowledge messages: %v\n", err)
  		}

  		if !hasMore(info) {
  			log.Println("No more messages to pull, sleeping for 5 seconds")
  			time.Sleep(5 * time.Second)
  		}
  	}
  }
  ```

  ```rust Rust theme={null}
  async fn run(client: &Client, consumer_id: &str) {
      loop {
          match pull_messages(client, consumer_id).await {
              Ok(response) => {
                  let messages = response.data;
                  let info = response.info;
                  info!("Pulled {} messages", messages.len());

                  // First, process the messages and verify success
                  process_messages(&messages).await;

                  // Then, acknowledge the messages
                  if let Err(e) = ack_messages(client, consumer_id, &messages.iter().map(|msg| msg.ack_token.clone()).collect::<Vec<_>>()).await {
                      error!("Failed to acknowledge messages: {}", e);
                  }

                  // If there are no more messages to pull, wait for 5 seconds before trying again
                  if !has_more(&info) {
                      info!("No more messages to pull, sleeping for 5 seconds");
                      sleep(Duration::from_secs(5)).await;
                  }
              }
              Err(e) => {
                  error!("Failed to pull messages: {}", e);
                  // Wait for 5 seconds before trying again
                  sleep(Duration::from_secs(5)).await;
              }
          }
      }
  }
  ```

  ```ruby Ruby theme={null}
  def run(consumer_id, logger)
    loop do
      begin
        response = pull_messages(consumer_id)
        messages = response["data"]
        info = response["info"]
        logger.info("Pulled #{messages.length} messages")

        # First, process the messages and verify success
        process_messages(messages)

        # Then, acknowledge the messages
        ack_messages(consumer_id, messages.map { |msg| msg["ack_token"] })

        # If there are no more messages to pull, wait for 5 seconds before trying again
        unless has_more(info)
          logger.info("No more messages to pull, sleeping for 5 seconds")
          sleep 5
        end

      rescue StandardError => e
        logger.error("Failed to pull messages: #{e}")
        # Wait for 5 seconds before trying again
        sleep 5
      end
    end
  end
  ```

  ```swift Swift theme={null}
  func run(consumerId: String) {
      while true {
          do {
              let response = try pullMessages(consumerId: consumerId)
              let messages = response.data
              let info = response.info

              print("Pulled \(messages.count) messages")

              // First, process the messages and verify success
              processMessages(messages: messages)

              // Then, acknowledge the messages
              try ackMessages(consumerId: consumerId, ackTokens: messages.map { $0.ackToken })

              // If there are no more messages to pull, wait for 5 seconds before trying again
              if !hasMore(info: info) {
                  print("No more messages to pull, sleeping for 5 seconds")
                  sleep(5)
              }
          } catch {
              print("Failed to pull messages: \(error)")
              // Wait for 5 seconds before trying again
              sleep(5)
          }
      }
  }
  ```

  ```elixir Elixir theme={null}
    def run(consumer_id) do
      case pull_messages(consumer_id) do
        {:ok, %{messages: messages, info: info}} ->
          Logger.info("Pulled #{length(messages)} messages")

          # First, process the messages and verify success
          :ok = process_messages(messages)

          # Then, acknowledge the messages
          ack_messages(consumer_id, messages)

          # If there are no more messages to pull, wait for 5 seconds before trying again
          if not has_more?(info) do
            Logger.info("No more messages to pull, sleeping for 5 seconds")
            Process.sleep(:timer.seconds(5))
          end

        {:error, error} ->
          Logger.error("Failed to pull messages: #{inspect(error)}")

          # Wait for 5 seconds before trying again
          Process.sleep(:timer.seconds(5))
      end

      # Recursively call the function to keep pulling messages
      run(consumer_id)
    end
  ```
</CodeGroup>

### Pulling and acking messages

Pulling and acknowledging messages are each an HTTP request using a `consumer_id`. You can setup and configure the consumer in the [Sequin console](https://console.sequin.io) under a stream.

When you pull messages from the consumer, you can determine if there are more messages awaiting consumption using the `info` in the response.

<CodeGroup>
  ```javascript Javascript theme={null}
  async function pullMessages(consumerId) {
    const url = `https://api.sequin.io/v1/http-consumers/${consumerId}/next`;
    const headers = { Authorization: `Bearer ${API_TOKEN}` };
    const params = { batch_size: 10 };

    const response = await axios.get(url, { headers, params });
    return response.data;
  }

  async function processMessages(messages) {
    // TODO: process messages
  }

  async function ackMessages(consumerId, ackTokens) {
    if (ackTokens.length === 0) {
      return;
    }

    const url = `https://api.sequin.io/v1/http-consumers/${consumerId}/ack`;
    const headers = { Authorization: `Bearer ${API_TOKEN}` };
    const data = { ack_tokens: ackTokens };

    await axios.post(url, data, { headers });
  }

  function hasMore(info) {
    return info.num_pending  > 0;
  }
  ```

  ```python Python theme={null}
  def pull_messages(consumer_id):
      url = f"https://api.sequin.io/v1/http-consumers/{consumer_id}/next"
      headers = {"Authorization": f"Bearer {API_TOKEN}"}
      params = {"batch_size": 10}

      response = requests.get(url, headers=headers, params=params)
      response.raise_for_status()

      return response.json()

  def process_messages(messages):
      # TODO: process messages
      pass

  def ack_messages(consumer_id, ack_tokens):
      if not ack_tokens:
          return

      url = f"https://api.sequin.io/v1/http-consumers/{consumer_id}/ack"
      headers = {"Authorization": f"Bearer {API_TOKEN}"}
      data = {"ack_tokens": ack_tokens}

      response = requests.post(url, headers=headers, json=data)
      response.raise_for_status()

  def has_more(info):
      return info.get("num_pending", 0) > 0
  ```

  ```go Go theme={null}
  func pullMessages(consumerID string) (*Response, error) {
  	url := fmt.Sprintf("https://api.sequin.io/v1/http-consumers/%s/next", consumerID)
  	req, err := http.NewRequest("GET", url, nil)
  	if err != nil {
  		return nil, err
  	}

  	req.Header.Set("Authorization", "Bearer "+apiToken)
  	q := req.URL.Query()
  	q.Add("batch_size", "10")
  	req.URL.RawQuery = q.Encode()

  	client := &http.Client{}
  	resp, err := client.Do(req)
  	if err != nil {
  		return nil, err
  	}
  	defer resp.Body.Close()

  	if resp.StatusCode != http.StatusOK {
  		return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  	}

  	body, err := ioutil.ReadAll(resp.Body)
  	if err != nil {
  		return nil, err
  	}

  	var response Response
  	if err := json.Unmarshal(body, &response); err != nil {
  		return nil, err
  	}

  	return &response, nil
  }

  func processMessages(messages []Message) {
  	// TODO: process messages
  }

  func ackMessages(consumerID string, ackTokens []string) error {
  	if len(ackTokens) == 0 {
  		return nil
  	}

  	url := fmt.Sprintf("https://api.sequin.io/v1/http-consumers/%s/ack", consumerID)
  	data := map[string][]string{"ack_tokens": ackTokens}
  	jsonData, err := json.Marshal(data)
  	if err != nil {
  		return err
  	}

  	req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
  	if err != nil {
  		return err
  	}

  	req.Header.Set("Authorization", "Bearer "+apiToken)
  	req.Header.Set("Content-Type", "application/json")

  	client := &http.Client{}
  	resp, err := client.Do(req)
  	if err != nil {
  		return err
  	}
  	defer resp.Body.Close()

  	if resp.StatusCode != http.StatusNoContent {
  		return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  	}

  	return nil
  }

  func hasMore(info Info) bool {
  	return info.NumPending > 0
  }
  ```

  ```rust Rust theme={null}
  async fn pull_messages(client: &Client, consumer_id: &str) -> Result<Response, MyError> {
      let url = format!("https://api.sequin.io/v1/http-consumers/{}/next", consumer_id);
      let response = client
          .get(&url)
          .bearer_auth(API_TOKEN)
          .query(&[("batch_size", "10")])
          .send()
          .await?
          .text()
          .await?;

      let response: Response = serde_json::from_str(&response)?;
      Ok(response)
  }

  async fn process_messages(_messages: &[Message]) {
      // TODO: process messages
  }

  async fn ack_messages(client: &Client, consumer_id: &str, ack_tokens: &[String]) -> Result<(), reqyoust::Error> {
      if ack_tokens.is_empty() {
          return Ok(());
      }

      let url = format!("https://api.sequin.io/v1/http-consumers/{}/ack", consumer_id);
      let data = AckData {
          ack_tokens: ack_tokens.to_vec(),
      };

      client
          .post(&url)
          .bearer_auth(API_TOKEN)
          .json(&data)
          .send()
          .await?
          .error_for_status()?;

      Ok(())
  }

  fn has_more(info: &Info) -> bool {
      info.num_pending > 0
  }
  ```

  ```ruby Ruby theme={null}
  def pull_messages(consumer_id)
    uri = URI("https://api.sequin.io/v1/http-consumers/#{consumer_id}/next")
    params = { "batch_size" => 10 }
    uri.query = URI.encode_www_form(params)

    request = Net::HTTP::Get.new(uri)
    request["Authorization"] = "Bearer #{API_TOKEN}"

    response = Net::HTTP.start(uri.hostname, uri.port, use_ssl: uri.scheme == "https") do |http|
      http.request(request)
    end

    raise "HTTP request failed: #{response.code} #{response.message}" unless response.is_a?(Net::HTTPSuccess)

    JSON.parse(response.body)
  end

  def process_messages(messages)
    # TODO: process messages
  end

  def ack_messages(consumer_id, ack_tokens)
    return if ack_tokens.empty?

    uri = URI("https://api.sequin.io/v1/http-consumers/#{consumer_id}/ack")

    request = Net::HTTP::Post.new(uri)
    request["Authorization"] = "Bearer #{API_TOKEN}"
    request.content_type = "application/json"
    request.body = { "ack_tokens" => ack_tokens }.to_json

    response = Net::HTTP.start(uri.hostname, uri.port, use_ssl: uri.scheme == "https") do |http|
      http.request(request)
    end

    raise "HTTP request failed: #{response.code} #{response.message}" unless response.is_a?(Net::HTTPSuccess)
  end

  def has_more(info)
    info.fetch("num_pending", 0) > 0
  end
  ```

  ```swift Swift theme={null}
  func pullMessages(consumerId: String) throws -> Response {
      let url = URL(string: "https://api.sequin.io/v1/http-consumers/\(consumerId)/next")!
      var request = URLRequest(url: url)
      request.setValue("Bearer \(apiToken)", forHTTPHeaderField: "Authorization")
      let params = ["batch_size": "10"]
      request.url = URL(string: url.absoluteString + "?" + params.queryString)

      let (data, response) = try URLSession.shared.syncRequest(with: request)
      guard let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 200 else {
          throw URLError(.badServerResponse)
      }

      return try JSONDecoder().decode(Response.self, from: data)
  }

  func processMessages(messages: [Message]) {
      // TODO: process messages
  }

  func ackMessages(consumerId: String, ackTokens: [String]) throws {
      guard !ackTokens.isEmpty else { return }

      let url = URL(string: "https://api.sequin.io/v1/http-consumers/\(consumerId)/ack")!
      var request = URLRequest(url: url)
      request.httpMethod = "POST"
      request.setValue("Bearer \(apiToken)", forHTTPHeaderField: "Authorization")
      request.setValue("application/json", forHTTPHeaderField: "Content-Type")
      request.httpBody = try JSONSerialization.data(withJSONObject: ["ack_tokens": ackTokens], options: [])

      let (data, response) = try URLSession.shared.syncRequest(with: request)
      if let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode != 204 {
          let responseBody = String(data: data, encoding: .utf8) ?? "No response body"
          throw NSError(domain: "HTTPError", code: httpResponse.statusCode, userInfo: ["body": responseBody])
      }
  }

  func hasMore(info: Info) -> Bool {
      return info.numPending > 0
  }

  extension Dictionary {
      var queryString: String {
          return self.map { "\($0)=\($1)" }.joined(separator: "&")
      }
  }

  extension URLSession {
      func syncRequest(with request: URLRequest) throws -> (Data, URLResponse) {
          var data: Data?
          var response: URLResponse?
          var error: Error?

          let semaphore = DispatchSemaphore(value: 0)
          let task = self.dataTask(with: request) { (taskData, taskResponse, taskError) in
              data = taskData
              response = taskResponse
              error = taskError
              semaphore.signal()
          }
          task.resume()
          semaphore.wait()

          if let error = error {
              throw error
          }

          return (data!, response!)
      }
  }
  ```

  ```elixir Elixir theme={null}
    defp pull_messages(consumer_id) do
      "https://api.sequin.io/v1/http-consumers/#{consumer_id}/next"
      |> Req.get(
        headers: %{authorization: "Bearer #{@api_token}"},
        params: %{batch_size: 10}
      )
      |> case do
        {:ok, %Req.Response{status: 200, body: %{"data" => messages, "info" => info}}} ->
          {:ok, %{messages: messages, info: info}}

        {:ok, response} ->
          {:error, response}

        {:error, error} ->
          {:error, error}
      end
    end

    defp process_messages(_messages) do
      # TODO: process messages
      :ok
    end

    # If there are no messages to ack, you can avoid making an HTTP request
    defp ack_messages(_consumer_id, []), do: :ok

    defp ack_messages(consumer_id, messages) do
      ack_tokens = Enum.map(messages, & &1["ack_token"])

      Req.post(
        "https://api.sequin.io/v1/http-consumers/#{consumer_id}/ack",
        json: %{ack_tokens: ack_tokens},
        headers: %{authorization: "Bearer #{@api_token}"}
      )
      |> case do
        # Note: A 204 status code indicates successful acknowledgement
        {:ok, %Req.Response{status: 204}} -> :ok
        {:ok, response} -> {:error, response}
        {:error, error} -> {:error, error}
      end
    end

    # you have more if there are pending messages
    defp has_more?(%{"num_pending" => num_pending}), do: num_pending > 0
  ```
</CodeGroup>

## Next steps

Once you are consuming records, you need to process them. Some common use cases include:

* [Upsert to postgres](/how-to/upsert-to-postgres)
* Trigger workflows like automated emails
* Trigger a write through the [Mutation API](/mutations)
