Skip to main content

Documentation Index

Fetch the complete documentation index at: https://sequin.io/docs/llms.txt

Use this file to discover all available pages before exploring further.

Read more about upserting to Postgres. See below for examples on upserting Sequin records to Postgres in popular Python database clients and ORMs:
According to its website, psycopg2 is a popular PostgreSQL database adapter for Python.Here’s an example of how you can structure upsert logic in psycopg2:
import psycopg2
import json
from datetime import datetime

# Establish a connection to the PostgreSQL database
conn = psycopg2.connect(
    host="yourhost",
    database="yourdb",
    user="youruser",
    password="yourpassword"
)

# Create a cursor object
cur = conn.cursor()

# Define the upsert function
def upsert_record(record):
  switch record["collection_id"]
  case "stripe:subscription":
    return upsert_subscription(record)
  # TODO: Handle other collections here

def upsert_subscription(record):
  id = record["id"]
  upstream_id = record["upstream_id"]
  updated_idx = record["updated_idx"]
  data = record["data"]
  sync = record["sync"]

  # Prepare the SQL query
  query = """
  INSERT INTO subscriptions (id, upstream_id, stripe_customer_id, plan_id, quantity, internal_id, updated_at, created_at, updated_idx)
  VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
  ON CONFLICT (id) DO UPDATE
  SET stripe_customer_id = excluded.stripe_customer_id,
      plan_id = excluded.plan_id,
      quantity = excluded.quantity,
      internal_id = excluded.internal_id,
      updated_at = excluded.updated_at,
      updated_idx = excluded.updated_idx
  WHERE subscriptions.updated_idx <= excluded.updated_idx;
  """

  # Execute the query with the record data
  cur.execute(query, (
    id,
    upstream_id,
    data["stripe_customer_id"],
    data["plan_id"],
    data["quantity"],
    sync["external_id"],
    datetime.now(),
    datetime.now(),
    updated_idx
  ))

  # Commit the transaction
  conn.commit()

  # Example usage
  record = {
    "id": "02819s-subscription-1",
    "upstream_id": "sub_9f2c4d8b3a6",
    "updated_idx": 42,
      "data": {
          "stripe_customer_id": "cus_e4d5b7a89f2",
          "plan_id": "pro_24c85b9d7ea_yearly",
          "quantity": 3
      },
      "sync": {
          "external_id": "cust_7b9d8f4a6e2"
      }
    }


  upsert_record(record)

  # Close the cursor and connection
  cur.close()
  conn.close()