Sequin’s Consume API is an HTTP interface for processing all your API data. It provides the same capabilities as modern messaging systems, like Kafka or NATS. You can process data in parallel, at high-throughput, and with strong guarantees about message delivery.

Background

In Sequin, a consumer is like a “consumer group” in Kafka, a “consumer” in NATS, and a queue in SQS. It defines a subset of all API records or events that you want to process. For example, you might have a consumer for salesforce:contact records and another for all stripe records.

For simplicity, we refer to records and events as messages in the Consume API.

You’ll define a consumer based on your application requirements. For example, if you’re just going to have one system process all messages, then it’s fine to have just one consumer that processes all messages. If you’re going to have multiple systems process messages, you can define a consumer for each that filters down to only the messages that system cares about.

You can have multiple workers (or instances) of your system process records for a single consumer. A consumer will only process each message exactly once. See more on processing and parallel processing below.

Setup

To begin, you’ll need to create a consumer. You can do so via the console or the Management API.

You can filter what subset of all API messages a consumer should receive. For example, you can configure a consumer to only receive messages from a specific API provider or only receive messages from a certain collection.

Here’s an example of creating a consumer via the Management API:

curl -X POST https://api.sequin.io/v1/http-consumers \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer {your-token}" \
  -d '{
    "name": "my-http-consumer",
    "ack_wait_ms": 30000,
    "max_deliver": -1,
    "deliver_policy": "all",
    "filters": {
      "collection_id": "salesforce:contact",
      "resource_id": "*"
    }
  }'

For more information on the available parameters and configuration options, see the Management API docs for consumers.

Processing messages

Once you have a consumer, you can start processing messages for it.

The first step is to make a call to the /next endpoint to get one or a batch of messages:

curl -X GET https://api.sequin.io/v1/http-consumers/{consumer}/next?batch_size=10 \
  -H "Authorization: Bearer {your-token}"

This will return a batch of messages, which contain an ack_token and a payload:

{
  "data": [
    {
      "ack_token": "MTYyeJ7abUjl1pO",
      "record": {
        "id": "aa9a329d-c3cb-4dea-9ea4-42e99cfc08d7",
        "collection_id": "salesforce:contact",
        "resource_id": "7d522b89-b092-40ec-9016-e69b81b5b317",
        "data": {
          "FirstName": "Paul",
          "LastName": "Atreides",
          // rest of payload...
        }
        // rest of record...
      }
    },
    // more messages...
  ],
  "info": {
    "num_pending": 500,
    "num_ack_pending": 20
  }
}

While your worker is processing this batch of messages, they will not be visible to other consumers. The amount of time these messages are not visible defaults to 30 seconds and is configurable in your consumer’s settings.

Once your worker has finished processing the messages, you’ll acknowledge or ack them. This tells Sequin you’re done processing them, and ensures that workers for your consumer won’t see them again:

curl -X POST https://api.sequin.io/v1/http-consumers/{consumer}/ack \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer {your-token}" \
  -d '{
    "ack_tokens": ["MTYyeJ7abUjl1pO", "MTYyeJ0p73hQak"]
  }'

Alternatively, if you’re unable to process the messages, you can nack them. This tells Sequin to make the messages available for processing again:

curl -X POST https://api.sequin.io/v1/http-consumers/{consumer}/nack \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer {your-token}" \
  -d '{
    "ack_tokens": ["MTYyeJ7abUjl1pO", "MTYyeJ0p73hQak"]
  }'

Nacking is a good option if for whatever reason you can’t process the messages right away, but you ancitipate they will be processable shortly. For example, if you’re having difficulty connecting to a downstream database, you can nack in the hopes that another worker will pick up the messages that has a working connection.

Instead of nacking, your worker can also do nothing. After the ack_wait_ms expires, the messages will be made available for processing again.

Parallel processing

You can have multiple workers process messages for a single consumer. This is a great way to scale your processing throughput.

Parallel processing can be problematic in instances where a record or event is processed out-of-order. For example:

  • Worker A starts processing an update event for Record 1.
  • Another update event comes in for Record 1. It’s picked up by Worker B.
  • Worker B finishes processing the update event before Worker A.
  • Worker A finishes processing – but replaces the more up-to-date, accurate write from Worker B.

While the record stream contains each record only once, if a record is updated while it’s being processed, it will reappear in the stream. And while events are strictly ordered in the stream, they may be processed out-of-order as per the example above.

You can protect against ordering issues by using the upstream_updated_at field on both records and events. For example, if your worker is upserting into a database, you can use an upsert with a filter like where upstream_updated_at <= {timestamp} to ensure that you’re not overwriting a more recent write.

We’re considering adding more tooling to our streams and consumers to help with parallel processing (namely partitioning). If this is interesting to you, please contact us.