Read more about upserting to Postgres.

According to its website, Ecto is a database wrapper and ORM that makes it easy to work with databases in Elixir.

To ingest Sequin records with Ecto, you’ll first setup your models like this:

defmodule MyApp.Stripe.Subscription do
  use Ecto.Schema
  import Ecto.Changeset

  @fields [
    :id,
    :upstream_id,
    :customer_id,
    :plan_id,
    :quantity,
    :internal_id,
    :updated_idx,
    :upstream_updated_at,
    :upstream_created_at
  ]

  @schema_prefix :stripe
  @primary_key {:id, :string, []}
  schema "subscriptions" do
    field(:upstream_id, :string)
    field(:customer_id, :string)
    field(:plan_id, :string)
    field(:quantity, :integer)
    field(:internal_id, :string)
    field(:updated_idx, :integer)
    field(:upstream_updated_at, :naive_datetime)
    field(:upstream_created_at, :naive_datetime)

    timestamps()
  end

  def changeset(subscription, attrs) do
    subscription
    |> cast(attrs, @fields)
    |> validate_required(@fields)
  end
end

You can also define a struct definition for Sequin records like this:

defmodule Sequin.Record do
  defstruct [
    :id,
    :upstream_id,
    :collection_id,
    :data,
    :deleted,
    :inserted_at,
    :provider,
    :stream_id,
    :sync,
    :updated_idx,
    :upstream_created_at,
    :upstream_updated_at
  ]
end

Then, your upsert logic will look like this:

defmodule MyApp.Stripe do
  import Ecto.Query
  alias MyApp.Repo
  alias MyApp.Stripe.Subscription
  alias Sequin.Record

  def upsert_record(%Record{collection_id: "stripe:subscription"} = record) do
    %{
      id: id,
      upstream_id: upstream_id,
      data: %{
        "stripe_customer_id" => stripe_customer_id,
        "plan_id" => plan_id,
        "quantity" => quantity
      },
      updated_idx: updated_idx,
      upstream_created_at: upstream_created_at,
      upstream_updated_at: upstream_updated_at
    } = record

    # Map to internal columns
    changeset_attrs = %{
      id: id,
      upstream_id: upstream_id,
      customer_id: stripe_customer_id,
      plan_id: plan_id,
      quantity: quantity,
      internal_id: id,
      updated_idx: updated_idx,
      upstream_created_at: upstream_created_at,
      upstream_updated_at: upstream_updated_at
    }

    Subscription.changeset(%Subscription{}, changeset_attrs)
    |> Repo.insert(
      on_conflict: {:replace_all_except, [:upstream_id]}
    )
  end

  # Define upsert functions for other Stripe records
  def upsert_record(%Record{collection_id: "stripe:customer"} = _record) do
    # TODO
  end
end