In this guide we will build a consumer that upserts GitHub Pull Request records to Postgres.

Building and owning this consumer gives you full control of the schema, constraints, and business logic of your Postgres upserts. The code you write that pulls records from Sequin’s HTTP Consumer and upserts them to Postgres can be extended to translate or enrich the records to suit any application requirement.

Use this approach if you need a high level of control. If your application has less stringent requirements, the default Postgres Consumer might be a better fit.

Setup Sequin

Before starting this build, setup a GitHub sync. You can do this in the Sequin console or via the Management API. Ensure that you are syncing the Pull Request collection.

curl --request POST \
  --url https://api.sequin.io/v1/syncs \
  --header 'Authorization: Bearer YOUR_API_TOKEN' \
  --header 'Content-Type: application/json' \
  --data '{
    "provider": "github",
    "name": "Github Sync",
    "collection_ids": ["github:pull_request"],
    "credential": {
      "provider": "github",
      "properties": {
          "account_login": "RTLS",
          "account_type": "User",
          "installation_id": "48696650",
          "kind": "github_app"
        }
      }
    }
  }'

Second, configure your Sequin HTTP Consumer. Here we use the Management API to create a Pull Consumer that filters for just GitHub Pull Requests.

curl --request POST \
  --url https://api.sequin.io/v1/http-consumers \
  --header 'Authorization: Bearer YOUR_API_TOKEN' \
  --header 'Content-Type: application/json' \
  --data '{
    "name": "GitHub Pull Request Consumer",
    "filter_collection_id": "github:pull_request"
    }
  }'

That’s it for configuring Sequin!

The Code

To effectively use the HTTP Pull Consumer we need to:

  1. Pull records from the consumer via HTTP
  2. Process the records (in our case, upsert to Postgres)
  3. ACK or NACK the records for the consumer

Let’s get started. We will use Elixir- first we create a new mix project:

mix new postgres_sink --sup

Then, install Req for our HTTP requests:

def deps do
  [
    ...
    {:req, "~> 0.4.13"}
  ]
end

To pull GitHub Pull Requests records out of our Sequin Consumer, we will use a GenServer that polls for new records.

A GenServer in Elixir is a process that has state and can execute code. In this example, we use the GenServer for the lifecycle and business logic of our Postgres sink.

defmodule PostgresSink.Consumer do
  use GenServer

  def start_link(consumer_id) do
    GenServer.start_link(__MODULE__, consumer_id, name: __MODULE__)
  end

  @impl GenServer
  def init(consumer_id) do
    schedule_pull()
    {:ok, consumer_id}
  end

  defp schedule_pull(timeout \\ 0) do
    Process.send_after(self(), :pull, timeout)
  end

  @impl GenServer
  def handle_info(:pull, consumer_id) do
    Logger.info("Pulling messages")

    case pull_messages(consumer_id) do
      {:ok, []} ->
        Logger.info("No messages to pull")
        schedule_pull(:timer.seconds(5))

      {:ok, messages} ->
        Logger.info("Pulled #{length(messages)} messages")
        Enum.each(messages, &process_message(consumer_id, &1))
        schedule_pull()

      {:error, error} ->
        Logger.error("Failed to pull messages: #{inspect(error)}")
        schedule_pull(:timer.seconds(5))
    end

    {:noreply, consumer_id}
  end
end

We initialize the GenServer with a consumer_id and immediately schedule a pull from the Sequin HTTP Consumer. Everytime we handle a :pull message, we pull messages from the consumer and schedule another :pull message.

In the event that there are no new messages or an error pulling messages, we backoff the next pull for a few seconds. Else we schedule the next pull immediately.

Implementing the pull is as easy as a single GET request using the consumer_id:

  def pull_messages(consumer_id) do
    "https://api.sequin.io/v1/http-consumers/#{consumer_id}/next"
    |> Req.get(headers: %{authorization: "Bearer #{@api_token}"})
    |> case do
      {:ok, %Req.Response{status: 200, body: %{"data" => messages}}} ->
        {:ok, messages}

      _ ->
        {:error, :failed_to_pull_messages}
    end
  end

Sequin’s HTTP Pull Consumer keeps track of which messages have already been delivered. So calling next on your consumer_id is guaranteed to provide the next set of messages to handle. To learn more, checkout the HTTP Pull Consumer docs.

Let’s implement a placeholder process_message/2 that prints the message so we can take a look:

  defp process_message(consumer_id, message) do
    IO.inspect(message)
  end

We start our project with iex -S mix and take a look at the logs. One example of a message is:

%{
  "ack_token" => "JEpTLkFDSy5JWC1SRUNPUkRTLURFVi5ocGMtODk2NGZmYjUtMzYzZS00MWVkLTgwNmQtM2E4NTNkODZiZDcyLjEuMTAxLjMwNS4xNzExMDQzNjEyOTk1MDQyNTg3LjA",
  "record" => %{
    "collection_id" => "github:pull_request",
    "data" => %{
      "activeLockReason" => nil,
      "additions" => 437,
      "authorAssociation" => "OWNER",
      "baseRefName" => "main",
      "baseRefOid" => "4a778cbf7c83dbb98ec31c2a3ff56cc202278c71",
      "body" => "",
      "changedFiles" => 24,
      "closedAt" => "2024-03-21T17:49:32Z",
      "createdAt" => "2024-03-21T17:47:28Z",
      "databaseId" => 1785067841,
      "deletions" => 262,
      "headRefName" => "03-12-_Neovim_config_v2",
      "headRefOid" => "90c0aafdd5f9154dfdddd6b77d7688b671a8760a",
      "id" => "PR_kwDOLjrV6c5qZflB",
      "isDraft" => false,
      "locked" => false,
      "maintainerCanModify" => false,
      "mergeable" => "UNKNOWN",
      "merged" => true,
      "mergedAt" => "2024-03-21T17:49:32Z",
      "number" => 1,
      "repositoryId" => 775607785,
      "state" => "MERGED",
      "title" => "✨ Neovim config v2",
      "totalCommentsCount" => 1,
      "updatedAt" => "2024-03-21T17:49:32Z",
      "url" => "https://github.com/RTLS/dotfiles/pull/1"
    },
    "deleted" => false,
    "id" => "1b959a7a-09c2-4a32-9c48-31565ba42e15",
    "inserted_at" => nil,
    "sync_id" => "8492d258-fbce-46c3-923c-6e02ef83d72c",
    "upstream_created_at" => "2024-03-21T17:47:28.000000Z",
    "upstream_id" => "1785067841",
    "upstream_updated_at" => "2024-03-21T17:49:32.000000Z"
  }
}

At the top level are two keys: ack_token and record. We will use the ack_token later for acknoledgement. For now, let’s write our upsert logic to Postgres.

You can see that the record data has upstream_id and deleted fields. We will use these two to determine if we will insert, update, or delete the record in our application database.

Database Setup

Let’s create a migration and a schema file for our Pull Request table in Postgres.

defmodule PostgresSink.Repo.Migrations.CreatePullRequests do
  use Ecto.Migration

  def change do
    create table(:pull_requests) do
      add :upstream_id, :string, null: false
      add :repository_id, :integer, null: false
      add :number, :integer, null: false
      add :title, :string, null: false
      add :body, :string
      add :state, :string, null: false
      add :url, :string, null: false
      add :created_at, :utc_datetime_usec, null: false
      add :updated_at, :utc_datetime_usec, null: false
      add :merged_at, :utc_datetime_usec
    end

    create unique_index(:pull_requests, [:upstream_id])
  end
end
defmodule PostgresSink.Github.PullRequest do
  use Ecto.Schema

  schema "pull_requests" do
    field :body, :string
    field :created_at, :utc_datetime_usec
    field :merged_at, :utc_datetime_usec
    field :number, :integer
    field :repository_id, :integer
    field :state, :string
    field :title, :string
    field :updated_at, :utc_datetime_usec
    field :upstream_id, :string
    field :url, :string
  end
end

This schema is entirely in your control because you own the Consumer. You choose which fields to sync, what the columns are named, and how to map datatypes. In this case, we will go with a straightforward subset of fields from the Record.

Upserts

Let’s now update our process_message function to upsert the incoming messages to this pull_requests table. We will use pattern matching to case on the deleted boolean and extract the record:

  defp process_message(consumer_id, message) do
    case message do
      %{"record" => %{"deleted" => true} = record} ->
        delete_pull_request(record) do

      %{"record" => %{"deleted" => false} = record} ->
        upsert_pull_request(record)
    end
  end

We simply switch on deleted. Now for our delete_ and upsert_ functions.

The delete_pull_request function unpacks the upstream_id from the data, queries for the pull request from our table, and deletes it. If we don’t find the record, we return :ok because there is nothing to delete:

  defp delete_pull_request(record) do
    %{"upstream_id" => upstream_id} = record

    case Github.find_pull_request(upstream_id) do
      {:ok, pull_request} ->
        Logger.info("Deleting pull request #{upstream_id}")
        Github.delete_pull_request(pull_request)

      {:error, :not_found} ->
        :ok
    end
  end

The upsert_pull_request function has a few steps:

  1. We unpack the fields that we want to sync from data.
  2. We map these fields to our internal field names as params. This explicit business logic determines how our Consumer maps a Sequin Record to our internal database schema.
  3. We query for an existing record by upstream_id. If it exists, we update it. If it does not exist, we create it.
  defp upsert_pull_request(record) do
    %{
      "upstream_id" => upstream_id,
      "data" => %{
        "repositoryId" => repository_id,
        "number" => number,
        "title" => title,
        "body" => body,
        "state" => state,
        "url" => url,
        "createdAt" => created_at,
        "updatedAt" => updated_at,
        "mergedAt" => merged_at
      }
    } = record

    params = %{
      upstream_id: upstream_id,
      repository_id: repository_id,
      number: number,
      title: title,
      body: body,
      state: state,
      url: url,
      created_at: created_at,
      updated_at: updated_at,
      merged_at: merged_at
    }

    case Github.find_pull_request(upstream_id) do
      {:ok, pull_request} ->
        Logger.info("Updating pull request #{upstream_id}")
        Github.update_pull_request(pull_request, params)

      {:error, :not_found} ->
        Logger.info("Creating pull request #{upstream_id}")
        Github.create_pull_request(params)
    end
  end

Acknowledgement

The last step is to acknowledge (ACK) a message that we successfully handle and no-acknowledge (NACK) a failed message.

  defp process_message(consumer_id, message) do
    case message do
      %{"record" => %{"deleted" => true} = record} ->
        case delete_pull_request(record) do
          {:ok, _} -> ack_message(consumer_id, message)
          {:error, _} -> nack_message(consumer_id, message)
        end

      %{"record" => %{"deleted" => false} = record} ->
        case upsert_pull_request(record) do
          {:ok, _} -> ack_message(consumer_id, message)
          {:error, _} -> nack_message(consumer_id, message)
        end
    end
  end

To ACK or NACK we pass the ack token back to the Sequin HTTP Consumer:

  def ack_message(consumer_id, %{"ack_token" => ack_token}) do
    Logger.info("Acking message: #{ack_token}")

    Req.post(
      "https://api.sequin.io/v1/http-consumers/#{consumer_id}/ack",
      json: %{ack_tokens: [ack_token]},
      headers: %{authorization: "Bearer #{@api_token}"}
    )
  end

  def nack_message(consumer_id, %{"ack_token" => ack_token}) do
    Logger.info("Nacking message: #{ack_token}")

    Req.post(
      "https://api.sequin.io/v1/http-consumers/#{consumer_id}/nack"
      json: %{ack_tokens: [ack_token]},
      headers: %{authorization: "Bearer #{@api_token}"}
    )
  end

Conclusion

Our Consumer is ready to go to sync Pull Request records from a Sequin HTTP Consumer to our database.

Because we own this Consumer in our application code, we fully control the schema, constraints, and business logic. If we want to add a column we can simply migrate our table with the new column and modify our upsert_pull_request function to extract the new field.

We can even enrich our Pull Requests with additional data like internal user_ids or vector embeddings for semantic search.

This consumer can even be modified to work with other databases like MySQL or Cassandra.