Read more about upserting to Postgres.

You’ll be processing Sequin records in your application code. A record has this shape:

type SequinRecord = {
    collectionID: string;
    data: { [key: string]: any };
    deleted: boolean;
    insertedAt: Date;
    provider: string;
    streamID: string;
    sync: { 
      id: string;
      external_id: string;
      metadata: { [key: string]: any };
    };
    updatedIdx: number;
    upstreamID: string;
    upstreamCreatedAt: Date;
    upstreamUpdatedAt: Date;
};

See below for examples on upserting Sequin records to Postgres in popular JavaScript database clients and ORMs:

According to its website, Drizzle is a modern, lightweight ORM for Node.js and TypeScript. It provides a simple and intuitive way to interact with databases, including PostgreSQL.

To perform an upsert using Drizzle, you can use Drizzle’s query builder with its onConflictDoUpdate method. Here’s an example:

import { drizzle } from "drizzle-orm/postgres-js";
import { integer, pgTable, text, timestamp } from "drizzle-orm/pg-core";
import { sql } from "drizzle-orm";
import postgres from "postgres";

// Init a PostgreSQL connection
const queryClient = postgres(
  "psql://<your-postgres-connection-string>"
);

// Create a Drizzle database instance
const db = drizzle(queryClient);

// Define the Subscription schema
const subscriptions = pgTable("subscription", {
  id: text("id").primaryKey(),
  upstreamId: text("upstream_id"),
  internalId: text("internal_id"),
  updatedAt: timestamp("updated_at"),
  insertedAt: timestamp("inserted_at"),
  updatedIdx: integer("updated_idx"),
  // define fields that you care about
  stripeCustomerId: text("stripe_customer_id"),
  planId: text("plan_id"),
  quantity: integer("quantity"),
  metadata: jsonb("metadata"),
});

// TODO: Define any other schemas here

async function upsertRecord(record: SequinRecord) {
  switch (record.collection_id) {
    case "stripe:subscription":
      return upsertSubscription(record);
    // TODO: Handle other collections here
  }
}

async function upsertSubscription(record: SequinRecord) {
  const { id, upstreamId, updatedIdx, sync, ...data } = record;

  const values = {
    // Map the Stripe values you care about to columns here
    // Stripe API fields are snake-cased
    upstreamId: upstreamId,
    stripeCustomerId: data.stripe_customer_id,
    planId: data.plan_id,
    quantity: data.quantity,
    // ...
  }

  await db
    .insert(subscriptions)
    .values({
      id,
      ...values,
      internalId: sync.external_id,
      updatedAt: new Date(),
      insertedAt: new Date(),
      updatedIdx,
    })
    .onConflictDoUpdate({
      target: subscriptions.id,
      // advanced protection: only update the record if the incoming updated_idx (version) is greater than or equal to existing
      setWhere: sql`subscription.updated_idx <= excluded.updated_idx`,
      set: {
        ...values,
        internalId: sync.externalId,
        updatedAt: new Date(),
        updatedIdx,
      },
    })
    .returning();
}