> ## Documentation Index
> Fetch the complete documentation index at: https://sequin.io/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Upsert to Postgres (Elixir/Ecto)

> One of the most popular destinations for API data is Postgres. Sequin makes it easy to sync API data to Postgres so you can read your data at rest.

[Read more about upserting to Postgres](/how-to/upsert-to-postgres).

According to its website, [Ecto](https://github.com/elixir-ecto/ecto) is a database wrapper and ORM that makes it easy to work with databases in Elixir.

To ingest Sequin records with Ecto, you'll first setup your models like this:

```elixir theme={null}
defmodule MyApp.Stripe.Subscription do
  use Ecto.Schema
  import Ecto.Changeset

  @fields [
    :id,
    :upstream_id,
    :customer_id,
    :plan_id,
    :quantity,
    :internal_id,
    :updated_idx,
    :upstream_updated_at,
    :upstream_created_at
  ]

  @schema_prefix :stripe
  @primary_key {:id, :string, []}
  schema "subscriptions" do
    field(:upstream_id, :string)
    field(:customer_id, :string)
    field(:plan_id, :string)
    field(:quantity, :integer)
    field(:internal_id, :string)
    field(:updated_idx, :integer)
    field(:upstream_updated_at, :naive_datetime)
    field(:upstream_created_at, :naive_datetime)

    timestamps()
  end

  def changeset(subscription, attrs) do
    subscription
    |> cast(attrs, @fields)
    |> validate_required(@fields)
  end
end
```

You can also define a struct definition for Sequin [records](/core-concepts#records) like this:

```elixir theme={null}
defmodule Sequin.Record do
  defstruct [
    :id,
    :upstream_id,
    :collection_id,
    :data,
    :deleted,
    :inserted_at,
    :provider,
    :stream_id,
    :sync,
    :updated_idx,
    :upstream_created_at,
    :upstream_updated_at
  ]
end
```

Then, your upsert logic will look like this:

```elixir theme={null}
defmodule MyApp.Stripe do
  import Ecto.Query
  alias MyApp.Repo
  alias MyApp.Stripe.Subscription
  alias Sequin.Record

  def upsert_record(%Record{collection_id: "stripe:subscription"} = record) do
    %{
      id: id,
      upstream_id: upstream_id,
      data: %{
        "stripe_customer_id" => stripe_customer_id,
        "plan_id" => plan_id,
        "quantity" => quantity
      },
      updated_idx: updated_idx,
      upstream_created_at: upstream_created_at,
      upstream_updated_at: upstream_updated_at
    } = record

    # Map to internal columns
    changeset_attrs = %{
      id: id,
      upstream_id: upstream_id,
      customer_id: stripe_customer_id,
      plan_id: plan_id,
      quantity: quantity,
      internal_id: id,
      updated_idx: updated_idx,
      upstream_created_at: upstream_created_at,
      upstream_updated_at: upstream_updated_at
    }

    Subscription.changeset(%Subscription{}, changeset_attrs)
    |> Repo.insert(
      on_conflict: {:replace_all_except, [:upstream_id]}
    )
  end

  # Define upsert functions for other Stripe records
  def upsert_record(%Record{collection_id: "stripe:customer"} = _record) do
    # TODO
  end
end
```
