Read more about upserting to Postgres.

You’ll be processing Sequin records in your application code. A record has this shape:

type SequinRecord = {
    collectionID: string;
    data: { [key: string]: any };
    deleted: boolean;
    insertedAt: Date;
    provider: string;
    streamID: string;
    sync: { 
      id: string;
      external_id: string;
      metadata: { [key: string]: any };
    updatedIdx: number;
    upstreamID: string;
    upstreamCreatedAt: Date;
    upstreamUpdatedAt: Date;

See below for examples on upserting Sequin records to Postgres in popular JavaScript database clients and ORMs:

According to its website, Drizzle is a modern, lightweight ORM for Node.js and TypeScript. It provides a simple and intuitive way to interact with databases, including PostgreSQL.

To perform an upsert using Drizzle, you can use Drizzle’s query builder with its onConflictDoUpdate method. Here’s an example:

import { drizzle } from "drizzle-orm/postgres-js";
import { integer, pgTable, text, timestamp } from "drizzle-orm/pg-core";
import { sql } from "drizzle-orm";
import postgres from "postgres";

// Init a PostgreSQL connection
const queryClient = postgres(

// Create a Drizzle database instance
const db = drizzle(queryClient);

// Define the Subscription schema
const subscriptions = pgTable("subscription", {
  id: text("id").primaryKey(),
  upstreamId: text("upstream_id"),
  internalId: text("internal_id"),
  updatedAt: timestamp("updated_at"),
  insertedAt: timestamp("inserted_at"),
  updatedIdx: integer("updated_idx"),
  // define fields that you care about
  stripeCustomerId: text("stripe_customer_id"),
  planId: text("plan_id"),
  quantity: integer("quantity"),
  metadata: jsonb("metadata"),

// TODO: Define any other schemas here

async function upsertRecord(record: SequinRecord) {
  switch (record.collection_id) {
    case "stripe:subscription":
      return upsertSubscription(record);
    // TODO: Handle other collections here

async function upsertSubscription(record: SequinRecord) {
  const { id, upstreamId, updatedIdx, sync, } = record;

  const values = {
    // Map the Stripe values you care about to columns here
    // Stripe API fields are snake-cased
    upstreamId: upstreamId,
    stripeCustomerId: data.stripe_customer_id,
    planId: data.plan_id,
    quantity: data.quantity,
    // ...

  await db
      internalId: sync.external_id,
      updatedAt: new Date(),
      insertedAt: new Date(),
      // advanced protection: only update the record if the incoming updated_idx (version) is greater than or equal to existing
      setWhere: sql`subscription.updated_idx <= excluded.updated_idx`,
      set: {
        internalId: sync.externalId,
        updatedAt: new Date(),