> ## Documentation Index
> Fetch the complete documentation index at: https://sequin.io/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Overview

> Sequin turns APIs into change data capture streams. The **Consume API** lets you read from those streams with robust messaging semantics. It's a great fit for building on top of your API data without needing to setup your own messaging infrastructure, like Kafka or NATS.

Sequin's Consume API is an HTTP interface for processing all your API data. It provides the same capabilities as modern messaging systems, like Kafka or NATS. You can process data in parallel, at high-throughput, and with strong guarantees about message delivery.

## Background

In Sequin, a **consumer** is like a "consumer group" in Kafka, a "consumer" in NATS, and a queue in SQS. It defines a subset of all API records or events that you want to process. For example, you might have a consumer for `salesforce:contact` records and another for all `stripe` records.

For simplicity, we refer to records and events as *messages* in the Consume API.

You'll define a consumer based on your application requirements. For example, if you're just going to have one system process all messages, then it's fine to have just one consumer that processes all messages. If you're going to have multiple systems process messages, you can define a consumer for each that filters down to only the messages that system cares about.

You can have multiple **workers** (or instances) of your system process records for a single consumer. A consumer will only process each message **exactly once**. See more on [processing](#processing-messages) and [parallel processing below](#parallel-processing).

## Setup

To begin, you'll need to create a **consumer**. You can do so via the console or the Management API.

You can filter what subset of all API messages a consumer should receive. For example, you can configure a consumer to only receive messages from a specific API provider or only receive messages from a certain collection.

Here's an example of creating a consumer via the Management API:

```bash theme={null}
curl -X POST https://api.sequin.io/v1/http-consumers \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer {your-token}" \
  -d '{
    "name": "my-http-consumer",
    "ack_wait_ms": 30000,
    "max_deliver": -1,
    "deliver_policy": "all",
    "filters": {
      "collection_id": "salesforce:contact",
      "resource_id": "*"
    }
  }'
```

For more information on the available parameters and configuration options, see the [Management API docs for consumers](/management-api/consumers).

## Processing messages

Once you have a consumer, you can start processing messages for it.

The first step is to make a call to the [`/next` endpoint](/streams/consume/next) to get one or a batch of messages:

```bash theme={null}
curl -X GET https://api.sequin.io/v1/http-consumers/{consumer}/next?batch_size=10 \
  -H "Authorization: Bearer {your-token}"
```

This will return a batch of messages, which contain an `ack_token` and a payload:

```json theme={null}
{
  "data": [
    {
      "ack_token": "MTYyeJ7abUjl1pO",
      "record": {
        "id": "aa9a329d-c3cb-4dea-9ea4-42e99cfc08d7",
        "collection_id": "salesforce:contact",
        "resource_id": "7d522b89-b092-40ec-9016-e69b81b5b317",
        "data": {
          "FirstName": "Paul",
          "LastName": "Atreides",
          // rest of payload...
        }
        // rest of record...
      }
    },
    // more messages...
  ],
  "info": {
    "num_pending": 500,
    "num_ack_pending": 20
  }
}
```

While your worker is processing this batch of messages, they will not be visible to other consumers. The amount of time these messages are not visible defaults to `30` seconds and is configurable in your consumer's settings.

Once your worker has finished processing the messages, you'll acknowledge or **ack** them. This tells Sequin you're done processing them, and ensures that workers for your consumer won't see them again:

```bash theme={null}
curl -X POST https://api.sequin.io/v1/http-consumers/{consumer}/ack \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer {your-token}" \
  -d '{
    "ack_tokens": ["MTYyeJ7abUjl1pO", "MTYyeJ0p73hQak"]
  }'
```

Alternatively, if you're unable to process the messages, you can **nack** them. This tells Sequin to make the messages available for processing again:

```bash theme={null}
curl -X POST https://api.sequin.io/v1/http-consumers/{consumer}/nack \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer {your-token}" \
  -d '{
    "ack_tokens": ["MTYyeJ7abUjl1pO", "MTYyeJ0p73hQak"]
  }'
```

Nacking is a good option if for whatever reason you can't process the messages right away, but you ancitipate they will be processable shortly. For example, if you're having difficulty connecting to a downstream database, you can nack in the hopes that another worker will pick up the messages that has a working connection.

Instead of nacking, your worker can also do nothing. After the `ack_wait_ms` expires, the messages will be made available for processing again.

### Parallel processing

You can have multiple workers process messages for a single consumer. This is a great way to scale your processing throughput.

Parallel processing can be problematic in instances where a record or event is processed out-of-order. For example:

* Worker A starts processing an update event for Record 1.
* Another update event comes in for Record 1. It's picked up by Worker B.
* Worker B finishes processing the update event before Worker A.
* Worker A finishes processing – but replaces the more up-to-date, accurate write from Worker B.

While the record stream contains each record only once, if a record is updated while it's being processed, it will reappear in the stream. And while events are strictly ordered in the stream, they may be processed out-of-order as per the example above.

You can protect against ordering issues by using the `upstream_updated_at` field on both records and events. For example, if your worker is upserting into a database, you can use an upsert with a filter like `where upstream_updated_at <= {timestamp}` to ensure that you're not overwriting a more recent write.

<Note>We're considering adding more tooling to our streams and consumers to help with parallel processing (namely partitioning). If this is interesting to you, please [contact us](mailto:support@sequin.io).</Note>
