Upsert to Postgres
One of the most popular destinations for API data is Postgres. Sequin makes it easy to sync API data to Postgres so you can read your data at rest.
You read data from Sequin via consumers. Consumers guarantee at-least-once delivery. This means that with the right upsert logic, you can be confident that your database completely matches the state of the API.
Schema design
By letting you control the schema of your Postgres tables, Sequin makes it easy for you to manage things like table design, migrations, and transforms in your preferred programming language and ORM.
A Sequin record contains a number of standard fields plus all the API-specific fields for a given collection. It’s usually a good idea to store a few of these standard fields as columns in each table:
Field Name | Type | Description |
---|---|---|
id | text | The unique ID of the record. This ID is unique across syncs and providers. You can use this as the primary key in your database. |
upstream_id | text | The ID of the record in the API. Useful for relationships between tables. |
deleted | boolean | Whether the record has been deleted in the API. |
updated_idx | bigint | A sequential index that increments every time the record is updated. Indicates the version of the record. See advanced below for ways you can leverage this field. |
sync.id | text | The ID of the sync this record belongs to. |
sync.external_id | text | The external_id of the sync this record belongs to. You can set external_id to any text value you’d like to correlate a sync to a user or account in your system. |
sync.metadata | jsonb | The metadata of the sync this record belongs to. You can store any information you’d like in metadata in order to correlate a sync to a user or account in your system. |
An example create table
statement that leverages a record’s fields might look like this:
create table stripe.subscriptions (
id text primary key,
upstream_id text,
deleted boolean,
updated_idx bigint,
sync_id text,
sync_external_id text,
sync_metadata jsonb,
-- these are often helpful to store too
upstream_updated_at timestamp with time zone,
upstream_created_at timestamp with time zone,
-- ...
-- add record-specific fields here. For example, for Stripe subscriptions, you'd add columns for `stripe_customer_id`, `plan_id`, `quantity`, etc.
-- ...
);
It’s a good idea to add some indexes as well:
create index stripe_subscriptions_sync_external_id_idx
on stripe.subscriptions (sync_external_id);
-- you might also consider adding indexes to any other ID columns to make joins etc faster
create index stripe_subscriptions_upstream_id_idx
on stripe.subscriptions (upstream_id);
Upserting
Each record in Sequin has a collection_slug
that you can use to determine which table it belongs to.
In raw SQL, an upsert of Sequin data into a table might look like this:
insert into stripe.subscriptions (id, stripe_customer_id, plan_id, quantity, sync_metadata, sync_external_id, updated_at, created_at, updated_idx)
values ($1, $2, $3, $4, $5, $6, $7, $8)
on conflict (id)
do update set
stripe_customer_id = excluded.stripe_customer_id,
plan_id = excluded.plan_id,
quantity = excluded.quantity,
sync_metadata = excluded.sync_metadata,
sync_external_id = excluded.sync_external_id,
updated_at = excluded.updated_at,
updated_idx = excluded.updated_idx;
Likewise, if you’re hard-deleting records, you might run a separate query to delete them like this:
delete from github.issues where id = ANY($1);
Soft deletes vs hard deletes
Sequin’s records each have a deleted
property. When a record is deleted in the API, Sequin sets deleted
to true
.
You can “soft delete” records in your system by simply upserting deleted records (and setting e.g. deleted
to true
). Alternatively, you can drop records from your database when they are deleted in the API.
Examples
See the sidebar for examples in various programming languages and ORMs.
Advanced upserts
We recommend you only worry about this advanced feature after you’re up and running in production with Sequin.
Sequin’s streams and consumers are designed to make syncing to Postgres as simple as possible. Sequin guards against race conditions in delivery, ensuring your workers only see the latest version of a given record.
However, depending on your setup, one race condition can occur. Let’s say your consumer is set with an ack_wait
of 60s. That means that delivered messages will be delivered after 60s if they’re not ack’d. If your workers are not configured with hard timeouts, the following can occur:
- Worker 1 retrieves message
A
. - While Worker 1 is processing its message,
A
is updated in the API and becomesA'
. Sequin continues to withhold delivery of the message. - Worker 1 is taking a long time to process. 60s elapse.
- The
ack_wait
for the message expires, so Sequin deliversA'
to Worker 2. - Worker 2 upserts
A'
. - Worker 1 finally completes processing
A
and upsertsA
.
In this instance, Worker 1 will have overwritten Worker 2’s upsert. Your database now contains a stale version of the record.
Outside of having hard timeouts on your workers, there’s another way to guard against this race condition. You can modify your upsert clause so that it looks like this:
insert into stripe.subscriptions (id, stripe_customer_id, ...)
values ($1, $2, ...)
on conflict (id)
do update set
stripe_customer_id = excluded.stripe_customer_id,
-- ...
where stripe.subscription.updated_idx <= excluded.updated_idx;
The final where
clause in the upsert prevents old records from overwriting new ones.
Some ORMs support where
clauses in upserts, and when they do we showcase it in our examples for that ORM. But most ORMs don’t support this Postgres feature. So, if you want to implement this extra protection and your ORM doesn’t support it, you can use a slightly different strategy:
- Open a transaction.
- Select the record you’re going to upsert
for update
. This will lock the record for the duration of the transaction. - If the record exists, compare
updated_idx
to the new record’supdated_idx
. Proceed with the update if the incoming record is newer. - If the record doesn’t exist, insert it.
- Close the transaction.
Was this page helpful?