Upsert to Postgres
Upsert to Postgres (Elixir/Ecto)
One of the most popular destinations for API data is Postgres. Sequin makes it easy to sync API data to Postgres so you can read your data at rest.
Read more about upserting to Postgres.
According to its website, Ecto is a database wrapper and ORM that makes it easy to work with databases in Elixir.
To ingest Sequin records with Ecto, you’ll first setup your models like this:
defmodule MyApp.Stripe.Subscription do
use Ecto.Schema
import Ecto.Changeset
@fields [
:id,
:upstream_id,
:customer_id,
:plan_id,
:quantity,
:internal_id,
:updated_idx,
:upstream_updated_at,
:upstream_created_at
]
@schema_prefix :stripe
@primary_key {:id, :string, []}
schema "subscriptions" do
field(:upstream_id, :string)
field(:customer_id, :string)
field(:plan_id, :string)
field(:quantity, :integer)
field(:internal_id, :string)
field(:updated_idx, :integer)
field(:upstream_updated_at, :naive_datetime)
field(:upstream_created_at, :naive_datetime)
timestamps()
end
def changeset(subscription, attrs) do
subscription
|> cast(attrs, @fields)
|> validate_required(@fields)
end
end
You can also define a struct definition for Sequin records like this:
defmodule Sequin.Record do
defstruct [
:id,
:upstream_id,
:collection_id,
:data,
:deleted,
:inserted_at,
:provider,
:stream_id,
:sync,
:updated_idx,
:upstream_created_at,
:upstream_updated_at
]
end
Then, your upsert logic will look like this:
defmodule MyApp.Stripe do
import Ecto.Query
alias MyApp.Repo
alias MyApp.Stripe.Subscription
alias Sequin.Record
def upsert_record(%Record{collection_id: "stripe:subscription"} = record) do
%{
id: id,
upstream_id: upstream_id,
data: %{
"stripe_customer_id" => stripe_customer_id,
"plan_id" => plan_id,
"quantity" => quantity
},
updated_idx: updated_idx,
upstream_created_at: upstream_created_at,
upstream_updated_at: upstream_updated_at
} = record
# Map to internal columns
changeset_attrs = %{
id: id,
upstream_id: upstream_id,
customer_id: stripe_customer_id,
plan_id: plan_id,
quantity: quantity,
internal_id: id,
updated_idx: updated_idx,
upstream_created_at: upstream_created_at,
upstream_updated_at: upstream_updated_at
}
Subscription.changeset(%Subscription{}, changeset_attrs)
|> Repo.insert(
on_conflict: {:replace_all_except, [:upstream_id]}
)
end
# Define upsert functions for other Stripe records
def upsert_record(%Record{collection_id: "stripe:customer"} = _record) do
# TODO
end
end