Read more about upserting to Postgres.

See below for examples on upserting Sequin records to Postgres in popular Python database clients and ORMs:

According to its website, psycopg2 is a popular PostgreSQL database adapter for Python.

Here’s an example of how you can structure upsert logic in psycopg2:

import psycopg2
import json
from datetime import datetime

# Establish a connection to the PostgreSQL database
conn = psycopg2.connect(
    host="yourhost",
    database="yourdb",
    user="youruser",
    password="yourpassword"
)

# Create a cursor object
cur = conn.cursor()

# Define the upsert function
def upsert_record(record):
  switch record["collection_id"]
  case "stripe:subscription":
    return upsert_subscription(record)
  # TODO: Handle other collections here

def upsert_subscription(record):
  id = record["id"]
  upstream_id = record["upstream_id"]
  updated_idx = record["updated_idx"]
  data = record["data"]
  sync = record["sync"]

  # Prepare the SQL query
  query = """
  INSERT INTO subscriptions (id, upstream_id, stripe_customer_id, plan_id, quantity, internal_id, updated_at, created_at, updated_idx)
  VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
  ON CONFLICT (id) DO UPDATE
  SET stripe_customer_id = excluded.stripe_customer_id,
      plan_id = excluded.plan_id,
      quantity = excluded.quantity,
      internal_id = excluded.internal_id,
      updated_at = excluded.updated_at,
      updated_idx = excluded.updated_idx
  WHERE subscriptions.updated_idx <= excluded.updated_idx;
  """

  # Execute the query with the record data
  cur.execute(query, (
    id,
    upstream_id,
    data["stripe_customer_id"],
    data["plan_id"],
    data["quantity"],
    sync["external_id"],
    datetime.now(),
    datetime.now(),
    updated_idx
  ))

  # Commit the transaction
  conn.commit()

  # Example usage
  record = {
    "id": "02819s-subscription-1",
    "upstream_id": "sub_9f2c4d8b3a6",
    "updated_idx": 42,
      "data": {
          "stripe_customer_id": "cus_e4d5b7a89f2",
          "plan_id": "pro_24c85b9d7ea_yearly",
          "quantity": 3
      },
      "sync": {
          "external_id": "cust_7b9d8f4a6e2"
      }
    }


  upsert_record(record)

  # Close the cursor and connection
  cur.close()
  conn.close()