Read more about upserting to Postgres. You’ll be processing Sequin records in your application code. A record has this shape:Documentation Index
Fetch the complete documentation index at: https://sequin.io/docs/llms.txt
Use this file to discover all available pages before exploring further.
type SequinRecord = {
collectionID: string;
data: { [key: string]: any };
deleted: boolean;
insertedAt: Date;
provider: string;
streamID: string;
sync: {
id: string;
external_id: string;
metadata: { [key: string]: any };
};
updatedIdx: number;
upstreamID: string;
upstreamCreatedAt: Date;
upstreamUpdatedAt: Date;
};
- Drizzle
- Prisma
- TypeORM
According to its website, Drizzle is a modern, lightweight ORM for Node.js and TypeScript. It provides a simple and intuitive way to interact with databases, including PostgreSQL.To perform an upsert using Drizzle, you can use Drizzle’s query builder with its
onConflictDoUpdate method. Here’s an example:import { drizzle } from "drizzle-orm/postgres-js";
import { integer, pgTable, text, timestamp } from "drizzle-orm/pg-core";
import { sql } from "drizzle-orm";
import postgres from "postgres";
// Init a PostgreSQL connection
const queryClient = postgres(
"psql://<your-postgres-connection-string>"
);
// Create a Drizzle database instance
const db = drizzle(queryClient);
// Define the Subscription schema
const subscriptions = pgTable("subscription", {
id: text("id").primaryKey(),
upstreamId: text("upstream_id"),
internalId: text("internal_id"),
updatedAt: timestamp("updated_at"),
insertedAt: timestamp("inserted_at"),
updatedIdx: integer("updated_idx"),
// define fields that you care about
stripeCustomerId: text("stripe_customer_id"),
planId: text("plan_id"),
quantity: integer("quantity"),
metadata: jsonb("metadata"),
});
// TODO: Define any other schemas here
async function upsertRecord(record: SequinRecord) {
switch (record.collection_id) {
case "stripe:subscription":
return upsertSubscription(record);
// TODO: Handle other collections here
}
}
async function upsertSubscription(record: SequinRecord) {
const { id, upstreamId, updatedIdx, sync, ...data } = record;
const values = {
// Map the Stripe values you care about to columns here
// Stripe API fields are snake-cased
upstreamId: upstreamId,
stripeCustomerId: data.stripe_customer_id,
planId: data.plan_id,
quantity: data.quantity,
// ...
}
await db
.insert(subscriptions)
.values({
id,
...values,
internalId: sync.external_id,
updatedAt: new Date(),
insertedAt: new Date(),
updatedIdx,
})
.onConflictDoUpdate({
target: subscriptions.id,
// advanced protection: only update the record if the incoming updated_idx (version) is greater than or equal to existing
setWhere: sql`subscription.updated_idx <= excluded.updated_idx`,
set: {
...values,
internalId: sync.externalId,
updatedAt: new Date(),
updatedIdx,
},
})
.returning();
}
According to its website, Prisma is a modern database toolkit that provides an easy-to-use, type-safe ORM for Node.js and TypeScript. It supports multiple databases, including PostgreSQL.To perform an upsert using Prisma, you can use the
upsert method. Here’s an example of updating a Stripe Subscription record:import { PrismaClient } from "@prisma/client";
const prisma = new PrismaClient();
async function upsertRecord(record: SequinRecord) {
switch (record.collection_id) {
case "stripe:subscription":
return upsertSubscription(record);
// TODO: Handle other collections here
}
}
async function upsertSubscription(record: SequinRecord) {
const { id, upstreamId, updatedIdx, sync, ...data } = record;
const values = {
// Map the Stripe values you care about to columns here
// Stripe API fields are snake-cased
upstreamId: upstreamId,
stripeCustomerId: data.stripe_customer_id,
planId: data.plan_id,
quantity: data.quantity,
metadata: data.metadata,
// ...
}
await prisma.subscriptions.upsert({
where: {
id,
},
create: {
id,
...values,
internalId: sync.externalId,
updatedAt: new Date(),
insertedAt: new Date(),
updatedIdx,
},
update: {
...values,
internalId: sync.externalId,
updatedAt: new Date(),
updatedIdx,
},
});
}
According to its website, TypeORM is an ORM for JavaScript and TypeScript that supports multiple databases, including PostgreSQL.To perform an upsert using TypeORM, you can use
createQueryBuilder with orUpdate to craft a SQL query. Here’s an example of updating a Stripe Subscription record:import "reflect-metadata";
import { DataSource, Entity, PrimaryGeneratedColumn, Column } from "typeorm";
@Entity()
export class Subscription {
@PrimaryGeneratedColumn()
id: string;
@Column()
upstream_id: string;
@Column()
internal_id: string;
@Column()
updated_at: Date;
@Column()
inserted_at: Date;
@Column()
updated_idx: number;
// Rest of the columns you're syncing
@Column()
stripe_customer_id: string;
@Column()
plan_id: string;
@Column()
quantity: number;
@Column()
metadata: string;
}
export const AppDataSource = new DataSource({
// replace with your connection string
url: "psql://<your-postgres-connection-string>"
});
async function upsertRecord(record: SequinRecord) {
switch (record.collection_id) {
case "stripe:subscription":
return upsertSubscription(record);
// TODO: Handle other collections here
}
}
async function upsertSubscription(record: SequinRecord) {
const { id, upstreamId, updatedIdx, sync, ...data } = record;
const values = {
// Map the Stripe values you care about to columns here
// Stripe API fields are snake-cased
upstreamId: upstreamId,
stripeCustomerId: data.stripe_customer_id,
planId: data.plan_id,
quantity: data.quantity,
metadata: data.metadata,
// ...
}
await AppDataSource.createQueryBuilder()
.insert()
.into(Subscription)
.values({
id,
...values,
internalId: sync.externalId,
updatedAt: new Date(),
insertedAt: new Date(),
updatedIdx,
})
.orUpdate(
["internal_id", "updated_at", "updated_idx", "stripe_customer_id", "plan_id", "quantity", "metadata"], // columns to update
["id"] // conflict target
)
.execute();
}
AppDataSource.initialize()
.then(async () => {
await upsertRecord(record)
})
.catch((error) =>
console.log("Error during Data Source initialization:", error)
);

