Documentation Index
Fetch the complete documentation index at: https://sequin.io/docs/llms.txt
Use this file to discover all available pages before exploring further.
Read more about upserting to Postgres.
According to its website, Ecto is a database wrapper and ORM that makes it easy to work with databases in Elixir.
To ingest Sequin records with Ecto, you’ll first setup your models like this:
defmodule MyApp.Stripe.Subscription do
use Ecto.Schema
import Ecto.Changeset
@fields [
:id,
:upstream_id,
:customer_id,
:plan_id,
:quantity,
:internal_id,
:updated_idx,
:upstream_updated_at,
:upstream_created_at
]
@schema_prefix :stripe
@primary_key {:id, :string, []}
schema "subscriptions" do
field(:upstream_id, :string)
field(:customer_id, :string)
field(:plan_id, :string)
field(:quantity, :integer)
field(:internal_id, :string)
field(:updated_idx, :integer)
field(:upstream_updated_at, :naive_datetime)
field(:upstream_created_at, :naive_datetime)
timestamps()
end
def changeset(subscription, attrs) do
subscription
|> cast(attrs, @fields)
|> validate_required(@fields)
end
end
You can also define a struct definition for Sequin records like this:
defmodule Sequin.Record do
defstruct [
:id,
:upstream_id,
:collection_id,
:data,
:deleted,
:inserted_at,
:provider,
:stream_id,
:sync,
:updated_idx,
:upstream_created_at,
:upstream_updated_at
]
end
Then, your upsert logic will look like this:
defmodule MyApp.Stripe do
import Ecto.Query
alias MyApp.Repo
alias MyApp.Stripe.Subscription
alias Sequin.Record
def upsert_record(%Record{collection_id: "stripe:subscription"} = record) do
%{
id: id,
upstream_id: upstream_id,
data: %{
"stripe_customer_id" => stripe_customer_id,
"plan_id" => plan_id,
"quantity" => quantity
},
updated_idx: updated_idx,
upstream_created_at: upstream_created_at,
upstream_updated_at: upstream_updated_at
} = record
# Map to internal columns
changeset_attrs = %{
id: id,
upstream_id: upstream_id,
customer_id: stripe_customer_id,
plan_id: plan_id,
quantity: quantity,
internal_id: id,
updated_idx: updated_idx,
upstream_created_at: upstream_created_at,
upstream_updated_at: upstream_updated_at
}
Subscription.changeset(%Subscription{}, changeset_attrs)
|> Repo.insert(
on_conflict: {:replace_all_except, [:upstream_id]}
)
end
# Define upsert functions for other Stripe records
def upsert_record(%Record{collection_id: "stripe:customer"} = _record) do
# TODO
end
end