You read data from Sequin via consumers. Consumers guarantee at-least-once delivery. This means that with the right upsert logic, you can be confident that your database completely matches the state of the API.

Schema design

By letting you control the schema of your Postgres tables, Sequin makes it easy for you to manage things like table design, migrations, and transforms in your preferred programming language and ORM.

A Sequin record contains a number of standard fields plus all the API-specific fields for a given collection. It’s usually a good idea to store a few of these standard fields as columns in each table:

Field NameTypeDescription
idtextThe unique ID of the record. This ID is unique across syncs and providers. You can use this as the primary key in your database.
upstream_idtextThe ID of the record in the API. Useful for relationships between tables.
deletedbooleanWhether the record has been deleted in the API.
updated_idxbigintA sequential index that increments every time the record is updated. Indicates the version of the record. See advanced below for ways you can leverage this field.
sync.idtextThe ID of the sync this record belongs to.
sync.external_idtextThe external_id of the sync this record belongs to. You can set external_id to any text value you’d like to correlate a sync to a user or account in your system.
sync.metadatajsonbThe metadata of the sync this record belongs to. You can store any information you’d like in metadata in order to correlate a sync to a user or account in your system.

An example create table statement that leverages a record’s fields might look like this:

create table stripe.subscriptions (
  id text primary key,
  upstream_id text,
  deleted boolean,
  updated_idx bigint,
  sync_id text,
  sync_external_id text,
  sync_metadata jsonb,
  -- these are often helpful to store too
  upstream_updated_at timestamp with time zone,
  upstream_created_at timestamp with time zone,
  -- ...
  -- add record-specific fields here. For example, for Stripe subscriptions, you'd add columns for `stripe_customer_id`, `plan_id`, `quantity`, etc.
  -- ...
);

It’s a good idea to add some indexes as well:

create index stripe_subscriptions_sync_external_id_idx
on stripe.subscriptions (sync_external_id);
-- you might also consider adding indexes to any other ID columns to make joins etc faster
create index stripe_subscriptions_upstream_id_idx
on stripe.subscriptions (upstream_id);

Upserting

Each record in Sequin has a collection_slug that you can use to determine which table it belongs to.

In raw SQL, an upsert of Sequin data into a table might look like this:

insert into stripe.subscriptions (id, stripe_customer_id, plan_id, quantity, sync_metadata, sync_external_id, updated_at, created_at, updated_idx)
  values ($1, $2, $3, $4, $5, $6, $7, $8)
  on conflict (id)
    do update set 
      stripe_customer_id = excluded.stripe_customer_id,
      plan_id = excluded.plan_id,
      quantity = excluded.quantity,
      sync_metadata = excluded.sync_metadata,
      sync_external_id = excluded.sync_external_id,
      updated_at = excluded.updated_at,
      updated_idx = excluded.updated_idx;

Likewise, if you’re hard-deleting records, you might run a separate query to delete them like this:

delete from github.issues where id = ANY($1);

Soft deletes vs hard deletes

Sequin’s records each have a deleted property. When a record is deleted in the API, Sequin sets deleted to true.

You can “soft delete” records in your system by simply upserting deleted records (and setting e.g. deleted to true). Alternatively, you can drop records from your database when they are deleted in the API.

Examples

See the sidebar for examples in various programming languages and ORMs.

Advanced upserts

We recommend you only worry about this advanced feature after you’re up and running in production with Sequin.

Sequin’s streams and consumers are designed to make syncing to Postgres as simple as possible. Sequin guards against race conditions in delivery, ensuring your workers only see the latest version of a given record.

However, depending on your setup, one race condition can occur. Let’s say your consumer is set with an ack_wait of 60s. That means that delivered messages will be delivered after 60s if they’re not ack’d. If your workers are not configured with hard timeouts, the following can occur:

  1. Worker 1 retrieves message A.
  2. While Worker 1 is processing its message, A is updated in the API and becomes A'. Sequin continues to withhold delivery of the message.
  3. Worker 1 is taking a long time to process. 60s elapse.
  4. The ack_wait for the message expires, so Sequin delivers A' to Worker 2.
  5. Worker 2 upserts A'.
  6. Worker 1 finally completes processing A and upserts A.

In this instance, Worker 1 will have overwritten Worker 2’s upsert. Your database now contains a stale version of the record.

Outside of having hard timeouts on your workers, there’s another way to guard against this race condition. You can modify your upsert clause so that it looks like this:

insert into stripe.subscriptions (id, stripe_customer_id, ...)
  values ($1, $2, ...)
  on conflict (id)
    do update set 
      stripe_customer_id = excluded.stripe_customer_id,
      -- ...
  where stripe.subscription.updated_idx <= excluded.updated_idx;

The final where clause in the upsert prevents old records from overwriting new ones.

Some ORMs support where clauses in upserts, and when they do we showcase it in our examples for that ORM. But most ORMs don’t support this Postgres feature. So, if you want to implement this extra protection and your ORM doesn’t support it, you can use a slightly different strategy:

  1. Open a transaction.
  2. Select the record you’re going to upsert for update. This will lock the record for the duration of the transaction.
  3. If the record exists, compare updated_idx to the new record’s updated_idx. Proceed with the update if the incoming record is newer.
  4. If the record doesn’t exist, insert it.
  5. Close the transaction.