Upsert to Postgres
Upsert to Postgres (Python)
One of the most popular destinations for API data is Postgres. Sequin makes it easy to sync API data to Postgres so you can read your data at rest.
Read more about upserting to Postgres.
See below for examples on upserting Sequin records to Postgres in popular Python database clients and ORMs:
According to its website, psycopg2 is a popular PostgreSQL database adapter for Python.
Here’s an example of how you can structure upsert logic in psycopg2
:
import psycopg2
import json
from datetime import datetime
# Establish a connection to the PostgreSQL database
conn = psycopg2.connect(
host="yourhost",
database="yourdb",
user="youruser",
password="yourpassword"
)
# Create a cursor object
cur = conn.cursor()
# Define the upsert function
def upsert_record(record):
switch record["collection_id"]
case "stripe:subscription":
return upsert_subscription(record)
# TODO: Handle other collections here
def upsert_subscription(record):
id = record["id"]
upstream_id = record["upstream_id"]
updated_idx = record["updated_idx"]
data = record["data"]
sync = record["sync"]
# Prepare the SQL query
query = """
INSERT INTO subscriptions (id, upstream_id, stripe_customer_id, plan_id, quantity, internal_id, updated_at, created_at, updated_idx)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE
SET stripe_customer_id = excluded.stripe_customer_id,
plan_id = excluded.plan_id,
quantity = excluded.quantity,
internal_id = excluded.internal_id,
updated_at = excluded.updated_at,
updated_idx = excluded.updated_idx
WHERE subscriptions.updated_idx <= excluded.updated_idx;
"""
# Execute the query with the record data
cur.execute(query, (
id,
upstream_id,
data["stripe_customer_id"],
data["plan_id"],
data["quantity"],
sync["external_id"],
datetime.now(),
datetime.now(),
updated_idx
))
# Commit the transaction
conn.commit()
# Example usage
record = {
"id": "02819s-subscription-1",
"upstream_id": "sub_9f2c4d8b3a6",
"updated_idx": 42,
"data": {
"stripe_customer_id": "cus_e4d5b7a89f2",
"plan_id": "pro_24c85b9d7ea_yearly",
"quantity": 3
},
"sync": {
"external_id": "cust_7b9d8f4a6e2"
}
}
upsert_record(record)
# Close the cursor and connection
cur.close()
conn.close()