Read more about upserting to Postgres. See below for examples on upserting Sequin records to Postgres in popular Python database clients and ORMs:Documentation Index
Fetch the complete documentation index at: https://sequin.io/docs/llms.txt
Use this file to discover all available pages before exploring further.
- psycopg2
- Django ORM
According to its website, psycopg2 is a popular PostgreSQL database adapter for Python.Here’s an example of how you can structure upsert logic in
psycopg2:import psycopg2
import json
from datetime import datetime
# Establish a connection to the PostgreSQL database
conn = psycopg2.connect(
host="yourhost",
database="yourdb",
user="youruser",
password="yourpassword"
)
# Create a cursor object
cur = conn.cursor()
# Define the upsert function
def upsert_record(record):
switch record["collection_id"]
case "stripe:subscription":
return upsert_subscription(record)
# TODO: Handle other collections here
def upsert_subscription(record):
id = record["id"]
upstream_id = record["upstream_id"]
updated_idx = record["updated_idx"]
data = record["data"]
sync = record["sync"]
# Prepare the SQL query
query = """
INSERT INTO subscriptions (id, upstream_id, stripe_customer_id, plan_id, quantity, internal_id, updated_at, created_at, updated_idx)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE
SET stripe_customer_id = excluded.stripe_customer_id,
plan_id = excluded.plan_id,
quantity = excluded.quantity,
internal_id = excluded.internal_id,
updated_at = excluded.updated_at,
updated_idx = excluded.updated_idx
WHERE subscriptions.updated_idx <= excluded.updated_idx;
"""
# Execute the query with the record data
cur.execute(query, (
id,
upstream_id,
data["stripe_customer_id"],
data["plan_id"],
data["quantity"],
sync["external_id"],
datetime.now(),
datetime.now(),
updated_idx
))
# Commit the transaction
conn.commit()
# Example usage
record = {
"id": "02819s-subscription-1",
"upstream_id": "sub_9f2c4d8b3a6",
"updated_idx": 42,
"data": {
"stripe_customer_id": "cus_e4d5b7a89f2",
"plan_id": "pro_24c85b9d7ea_yearly",
"quantity": 3
},
"sync": {
"external_id": "cust_7b9d8f4a6e2"
}
}
upsert_record(record)
# Close the cursor and connection
cur.close()
conn.close()
According to its website, Django ORM is a high-level Python Web framework that encourages rapid development and clean, pragmatic design.You can define a Then, here’s how you’d perform upserts:
Subscription model like this: from django.db import models
from django.utils import timezone
class Subscription(models.Model):
id = models.CharField(max_length=255, primary_key=True)
upstream_id = models.CharField(max_length=255)
stripe_customer_id = models.CharField(max_length=255)
plan_id = models.CharField(max_length=255)
quantity = models.IntegerField()
internal_id = models.CharField(max_length=255)
updated_at = models.DateTimeField(auto_now=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_idx = models.BigIntegerField()
from django.db import transaction
from myapp.models import Subscription
def upsert_record(record):
if record["collection_id"] == "stripe:subscription":
upsert_subscription(record)
# TODO: Handle other collections here
def upsert_subscription(record):
id = record["id"]
upstream_id = record["upstream_id"]
updated_idx = record["updated_idx"]
data = record["data"]
sync = record["sync"]
defaults = {
"stripe_customer_id": data["stripe_customer_id"],
"plan_id": data["plan_id"],
"quantity": data["quantity"],
"internal_id": sync["external_id"],
"updated_idx": updated_idx
}
# update_or_create handles checking if the subscription exists and updating it if so, or creating a new one if not.
# It also uses the 'defaults' dictionary to update fields only if the record needs to be created or updated.
subscription, created = Subscription.objects.update_or_create(
id=id, upstream_id=upstream_id,
defaults=defaults
)
# Example usage
record = {
"id": "02819s-subscription-1",
"upstream_id": "sub_9f2c4d8b3a6",
"updated_idx": 42,
"data": {
"stripe_customer_id": "cus_e4d5b7a89f2",
"plan_id": "pro_24c85b9d7ea_yearly",
"quantity": 3
},
"sync": {
"external_id": "cust_7b9d8f4a6e2"
},
"collection_id": "stripe:subscription"
}
upsert_record(record)

