Upsert to Postgres
Upsert to Postgres (JavaScript)
One of the most popular destinations for API data is Postgres. Sequin makes it easy to sync API data to Postgres so you can read your data at rest.
Read more about upserting to Postgres.
You’ll be processing Sequin records in your application code. A record has this shape:
type SequinRecord = {
collectionID: string;
data: { [key: string]: any };
deleted: boolean;
insertedAt: Date;
provider: string;
streamID: string;
sync: {
id: string;
external_id: string;
metadata: { [key: string]: any };
};
updatedIdx: number;
upstreamID: string;
upstreamCreatedAt: Date;
upstreamUpdatedAt: Date;
};
See below for examples on upserting Sequin records to Postgres in popular JavaScript database clients and ORMs:
According to its website, Drizzle is a modern, lightweight ORM for Node.js and TypeScript. It provides a simple and intuitive way to interact with databases, including PostgreSQL.
To perform an upsert using Drizzle, you can use Drizzle’s query builder with its onConflictDoUpdate
method. Here’s an example:
import { drizzle } from "drizzle-orm/postgres-js";
import { integer, pgTable, text, timestamp } from "drizzle-orm/pg-core";
import { sql } from "drizzle-orm";
import postgres from "postgres";
// Init a PostgreSQL connection
const queryClient = postgres(
"psql://<your-postgres-connection-string>"
);
// Create a Drizzle database instance
const db = drizzle(queryClient);
// Define the Subscription schema
const subscriptions = pgTable("subscription", {
id: text("id").primaryKey(),
upstreamId: text("upstream_id"),
internalId: text("internal_id"),
updatedAt: timestamp("updated_at"),
insertedAt: timestamp("inserted_at"),
updatedIdx: integer("updated_idx"),
// define fields that you care about
stripeCustomerId: text("stripe_customer_id"),
planId: text("plan_id"),
quantity: integer("quantity"),
metadata: jsonb("metadata"),
});
// TODO: Define any other schemas here
async function upsertRecord(record: SequinRecord) {
switch (record.collection_id) {
case "stripe:subscription":
return upsertSubscription(record);
// TODO: Handle other collections here
}
}
async function upsertSubscription(record: SequinRecord) {
const { id, upstreamId, updatedIdx, sync, ...data } = record;
const values = {
// Map the Stripe values you care about to columns here
// Stripe API fields are snake-cased
upstreamId: upstreamId,
stripeCustomerId: data.stripe_customer_id,
planId: data.plan_id,
quantity: data.quantity,
// ...
}
await db
.insert(subscriptions)
.values({
id,
...values,
internalId: sync.external_id,
updatedAt: new Date(),
insertedAt: new Date(),
updatedIdx,
})
.onConflictDoUpdate({
target: subscriptions.id,
// advanced protection: only update the record if the incoming updated_idx (version) is greater than or equal to existing
setWhere: sql`subscription.updated_idx <= excluded.updated_idx`,
set: {
...values,
internalId: sync.externalId,
updatedAt: new Date(),
updatedIdx,
},
})
.returning();
}