Build a sync to Postgres
Consume records from Sequin’s HTTP Consumer and upsert them to Postgres.
In this guide we will build a consumer that upserts GitHub Pull Request records to Postgres.
Building and owning this consumer gives you full control of the schema, constraints, and business logic of your Postgres upserts. The code you write that pulls records from Sequin’s HTTP Consumer and upserts them to Postgres can be extended to translate or enrich the records to suit any application requirement.
Use this approach if you need a high level of control. If your application has less stringent requirements, the default Postgres Consumer might be a better fit.
Setup Sequin
Before starting this build, setup a GitHub sync. You can do this in the Sequin console or via the Management API. Ensure that you are syncing the Pull Request
collection.
curl --request POST \
--url https://api.sequin.io/v1/syncs \
--header 'Authorization: Bearer YOUR_API_TOKEN' \
--header 'Content-Type: application/json' \
--data '{
"provider": "github",
"name": "Github Sync",
"collection_ids": ["github:pull_request"],
"credential": {
"provider": "github",
"properties": {
"account_login": "RTLS",
"account_type": "User",
"installation_id": "48696650",
"kind": "github_app"
}
}
}
}'
Second, configure your Sequin HTTP Consumer. Here we use the Management API to create a Pull Consumer that filters for just GitHub Pull Requests.
curl --request POST \
--url https://api.sequin.io/v1/http-consumers \
--header 'Authorization: Bearer YOUR_API_TOKEN' \
--header 'Content-Type: application/json' \
--data '{
"name": "GitHub Pull Request Consumer",
"filter_collection_id": "github:pull_request"
}
}'
That’s it for configuring Sequin!
The Code
To effectively use the HTTP Pull Consumer we need to:
- Pull records from the consumer via HTTP
- Process the records (in our case, upsert to Postgres)
ACK
orNACK
the records for the consumer
Let’s get started. We will use Elixir- first we create a new mix project:
mix new postgres_sink --sup
Then, install Req
for our HTTP requests:
def deps do
[
...
{:req, "~> 0.4.13"}
]
end
To pull GitHub Pull Requests records out of our Sequin Consumer, we will use a GenServer that polls for new records.
A GenServer in Elixir is a process that has state and can execute code. In this example, we use the GenServer for the lifecycle and business logic of our Postgres sink.
defmodule PostgresSink.Consumer do
use GenServer
def start_link(consumer_id) do
GenServer.start_link(__MODULE__, consumer_id, name: __MODULE__)
end
@impl GenServer
def init(consumer_id) do
schedule_pull()
{:ok, consumer_id}
end
defp schedule_pull(timeout \\ 0) do
Process.send_after(self(), :pull, timeout)
end
@impl GenServer
def handle_info(:pull, consumer_id) do
Logger.info("Pulling messages")
case pull_messages(consumer_id) do
{:ok, []} ->
Logger.info("No messages to pull")
schedule_pull(:timer.seconds(5))
{:ok, messages} ->
Logger.info("Pulled #{length(messages)} messages")
Enum.each(messages, &process_message(consumer_id, &1))
schedule_pull()
{:error, error} ->
Logger.error("Failed to pull messages: #{inspect(error)}")
schedule_pull(:timer.seconds(5))
end
{:noreply, consumer_id}
end
end
We initialize the GenServer with a consumer_id
and immediately schedule a pull from the Sequin HTTP Consumer. Everytime we handle a :pull
message, we pull messages from the consumer and schedule another :pull
message.
In the event that there are no new messages or an error pulling messages, we backoff the next pull for a few seconds. Else we schedule the next pull immediately.
Implementing the pull is as easy as a single GET
request using the consumer_id
:
def pull_messages(consumer_id) do
"https://api.sequin.io/v1/http-consumers/#{consumer_id}/next"
|> Req.get(headers: %{authorization: "Bearer #{@api_token}"})
|> case do
{:ok, %Req.Response{status: 200, body: %{"data" => messages}}} ->
{:ok, messages}
_ ->
{:error, :failed_to_pull_messages}
end
end
Sequin’s HTTP Pull Consumer keeps track of which messages have already been delivered. So calling next
on your consumer_id
is guaranteed to provide the next set of messages to handle. To learn more, checkout the HTTP Pull Consumer docs.
Let’s implement a placeholder process_message/2
that prints the message so we can take a look:
defp process_message(consumer_id, message) do
IO.inspect(message)
end
We start our project with iex -S mix
and take a look at the logs. One example of a message is:
%{
"ack_token" => "JEpTLkFDSy5JWC1SRUNPUkRTLURFVi5ocGMtODk2NGZmYjUtMzYzZS00MWVkLTgwNmQtM2E4NTNkODZiZDcyLjEuMTAxLjMwNS4xNzExMDQzNjEyOTk1MDQyNTg3LjA",
"record" => %{
"collection_id" => "github:pull_request",
"data" => %{
"activeLockReason" => nil,
"additions" => 437,
"authorAssociation" => "OWNER",
"baseRefName" => "main",
"baseRefOid" => "4a778cbf7c83dbb98ec31c2a3ff56cc202278c71",
"body" => "",
"changedFiles" => 24,
"closedAt" => "2024-03-21T17:49:32Z",
"createdAt" => "2024-03-21T17:47:28Z",
"databaseId" => 1785067841,
"deletions" => 262,
"headRefName" => "03-12-_Neovim_config_v2",
"headRefOid" => "90c0aafdd5f9154dfdddd6b77d7688b671a8760a",
"id" => "PR_kwDOLjrV6c5qZflB",
"isDraft" => false,
"locked" => false,
"maintainerCanModify" => false,
"mergeable" => "UNKNOWN",
"merged" => true,
"mergedAt" => "2024-03-21T17:49:32Z",
"number" => 1,
"repositoryId" => 775607785,
"state" => "MERGED",
"title" => "✨ Neovim config v2",
"totalCommentsCount" => 1,
"updatedAt" => "2024-03-21T17:49:32Z",
"url" => "https://github.com/RTLS/dotfiles/pull/1"
},
"deleted" => false,
"id" => "1b959a7a-09c2-4a32-9c48-31565ba42e15",
"inserted_at" => nil,
"sync_id" => "8492d258-fbce-46c3-923c-6e02ef83d72c",
"upstream_created_at" => "2024-03-21T17:47:28.000000Z",
"upstream_id" => "1785067841",
"upstream_updated_at" => "2024-03-21T17:49:32.000000Z"
}
}
At the top level are two keys: ack_token
and record
. We will use the ack_token
later for acknoledgement. For now, let’s write our upsert logic to Postgres.
You can see that the record
data has upstream_id
and deleted
fields. We will use these two to determine if we will insert, update, or delete the record in our application database.
Database Setup
Let’s create a migration and a schema file for our Pull Request
table in Postgres.
defmodule PostgresSink.Repo.Migrations.CreatePullRequests do
use Ecto.Migration
def change do
create table(:pull_requests) do
add :upstream_id, :string, null: false
add :repository_id, :integer, null: false
add :number, :integer, null: false
add :title, :string, null: false
add :body, :string
add :state, :string, null: false
add :url, :string, null: false
add :created_at, :utc_datetime_usec, null: false
add :updated_at, :utc_datetime_usec, null: false
add :merged_at, :utc_datetime_usec
end
create unique_index(:pull_requests, [:upstream_id])
end
end
defmodule PostgresSink.Github.PullRequest do
use Ecto.Schema
schema "pull_requests" do
field :body, :string
field :created_at, :utc_datetime_usec
field :merged_at, :utc_datetime_usec
field :number, :integer
field :repository_id, :integer
field :state, :string
field :title, :string
field :updated_at, :utc_datetime_usec
field :upstream_id, :string
field :url, :string
end
end
This schema is entirely in your control because you own the Consumer. You choose which fields to sync, what the columns are named, and how to map datatypes. In this case, we will go with a straightforward subset of fields from the Record
.
Upserts
Let’s now update our process_message
function to upsert the incoming messages to this pull_requests
table. We will use pattern matching to case on the deleted
boolean and extract the record
:
defp process_message(consumer_id, message) do
case message do
%{"record" => %{"deleted" => true} = record} ->
delete_pull_request(record) do
%{"record" => %{"deleted" => false} = record} ->
upsert_pull_request(record)
end
end
We simply switch on deleted
. Now for our delete_
and upsert_
functions.
The delete_pull_request
function unpacks the upstream_id
from the data
, queries for the pull request from our table, and deletes it. If we don’t find the record, we return :ok
because there is nothing to delete:
defp delete_pull_request(record) do
%{"upstream_id" => upstream_id} = record
case Github.find_pull_request(upstream_id) do
{:ok, pull_request} ->
Logger.info("Deleting pull request #{upstream_id}")
Github.delete_pull_request(pull_request)
{:error, :not_found} ->
:ok
end
end
The upsert_pull_request
function has a few steps:
- We unpack the fields that we want to sync from
data
. - We map these fields to our internal field names as
params
. This explicit business logic determines how our Consumer maps aSequin Record
to our internal database schema. - We query for an existing record by
upstream_id
. If it exists, we update it. If it does not exist, we create it.
defp upsert_pull_request(record) do
%{
"upstream_id" => upstream_id,
"data" => %{
"repositoryId" => repository_id,
"number" => number,
"title" => title,
"body" => body,
"state" => state,
"url" => url,
"createdAt" => created_at,
"updatedAt" => updated_at,
"mergedAt" => merged_at
}
} = record
params = %{
upstream_id: upstream_id,
repository_id: repository_id,
number: number,
title: title,
body: body,
state: state,
url: url,
created_at: created_at,
updated_at: updated_at,
merged_at: merged_at
}
case Github.find_pull_request(upstream_id) do
{:ok, pull_request} ->
Logger.info("Updating pull request #{upstream_id}")
Github.update_pull_request(pull_request, params)
{:error, :not_found} ->
Logger.info("Creating pull request #{upstream_id}")
Github.create_pull_request(params)
end
end
Acknowledgement
The last step is to acknowledge (ACK
) a message that we successfully handle and no-acknowledge (NACK
) a failed message.
defp process_message(consumer_id, message) do
case message do
%{"record" => %{"deleted" => true} = record} ->
case delete_pull_request(record) do
{:ok, _} -> ack_message(consumer_id, message)
{:error, _} -> nack_message(consumer_id, message)
end
%{"record" => %{"deleted" => false} = record} ->
case upsert_pull_request(record) do
{:ok, _} -> ack_message(consumer_id, message)
{:error, _} -> nack_message(consumer_id, message)
end
end
end
To ACK
or NACK
we pass the ack token back to the Sequin HTTP Consumer:
def ack_message(consumer_id, %{"ack_token" => ack_token}) do
Logger.info("Acking message: #{ack_token}")
Req.post(
"https://api.sequin.io/v1/http-consumers/#{consumer_id}/ack",
json: %{ack_tokens: [ack_token]},
headers: %{authorization: "Bearer #{@api_token}"}
)
end
def nack_message(consumer_id, %{"ack_token" => ack_token}) do
Logger.info("Nacking message: #{ack_token}")
Req.post(
"https://api.sequin.io/v1/http-consumers/#{consumer_id}/nack"
json: %{ack_tokens: [ack_token]},
headers: %{authorization: "Bearer #{@api_token}"}
)
end
Conclusion
Our Consumer is ready to go to sync Pull Request records from a Sequin HTTP Consumer to our database.
Because we own this Consumer in our application code, we fully control the schema, constraints, and business logic. If we want to add a column we can simply migrate our table with the new column and modify our upsert_pull_request
function to extract the new field.
We can even enrich our Pull Requests with additional data like internal user_ids
or vector embeddings for semantic search.
This consumer can even be modified to work with other databases like MySQL or Cassandra.
Was this page helpful?