In this guide we will build a Kafka consumer that upserts GitHub Pull Request records to Postgres.

Building and owning this consumer gives you full control on the schema, constraints, and business logic of your sync.

The Consumer is the translation layer from a Sequin Record to the internal schema of the Pull Request object in your application code.

Owning the Consumer also allows you to enrich your Pull Request records. For instance, you can include a vector embedding of the title + body to support semantic search.

Requirements

Before starting this build, setup a GitHub sync in the Sequin console. Ensure that you are syncing the Pull Request collection.

Consumer Setup

We will use Elixir for our project and BroadwayKafka for our Kafka consumer. You can find a (non-exhaustive) list of Kafka clients at apache.org.

Let’s get started. First we create a new project:

mix new postgres_sink --sup

Then, install BroadwayKafka with the initial setup copied from the Broadway docs:

def deps do
  [
    ...
    {:broadway_kafka, "~> 0.3"}
  ]
end
defmodule PostgresSink.Consumer do
  use Broadway

  alias Broadway.Message

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module:
          {BroadwayKafka.Producer,
           [
             hosts: [localhost: 9092],
             group_id: "group_1",
             topics: ["test"]
           ]},
        concurrency: 1
      ],
      processors: [
        default: [
          concurrency: 10
        ]
      ]
    )
  end

  ...callbacks...
end

We can see that the default configuration must change so that we consume from the Sequin hosted Kafka stream. You can find your Kafka credentials in the console:

Kafka credentials

We update our host and include sasl and ssl configuration, which are required to connect to Sequin kafka:

  ...

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module:
          {BroadwayKafka.Producer,
           [
             hosts: ["loved-bluefish-1234-us1-kafka.upstash.io": 9092],
             group_id: "group_1",
             topics: ["test"],
             client_config: [
               sasl: {:scram_sha_256, "bG92ZWQtb...", "MjRjNGZhN..."},
               ssl: true
             ]
           ]},
        concurrency: 1
      ],
    ...
    )
  end

We are now able to connect to our Kafka cluster. Before we can consume events, we have to configure BroadwayKafka to pull Records from the sequin.records topic in a dedicated consumer group.

  1. Topic: Sequin provides two topics, one for Events and one for Records. We want our synced Postgres table to contain the most up to date version of each object in our sync so we will subscribe to the Records topic.
  2. Group ID: The Group ID should be unique to our consumer so we guarantee the consumer will receive all records from Kafka. We will choose a value, postgres-sink-group.
  3. Offset Rest Policy: BroadwayKafka will automatically create a consumer-group for our Group ID if it does not exist. When doing so, BroadwayKafka will set the initial offset value. Because we want to sync all Records from the beginning of time to Postgres, we will set our offset_reset_policy to :earliest.

Finalizing our connection config:

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module:
          {BroadwayKafka.Producer,
           [
             hosts: ["loved-bluefish-1234-us1-kafka.upstash.io": 9092],
             group_id: "postgres-sink-group",
             topics: ["sequin.records"],
             offset_reset_policy: :earliest,
             client_config: [
               sasl: {:scram_sha_256, "bG92ZWQtb...", "MjRjNGZhN..."},
               ssl: true
             ]
           ]},
        concurrency: 1
      ],
    ...
    )
  end

Last step before we start our consumer is to create a stub handle_message callback that simply prints the message to our shell:

  @impl Broadway
  def handle_message(_processor, %Message{} = message, _context) do
    IO.inspect(message, pretty: true)
  end

Starting the consumer with iex -S mix initiates consumption and prints Records from many Syncs and Collections. Keep in mind that the Kafka topic contains Records from every Sync in your organization.

One Pull Request object looks like:

Message: %Broadway.Message{
  data: "{\"collection_id\":\"github:pull_request\",\"sync_id\":\"7ece2bcd-3402-4093-9e2e-67dfcbdc31cd\",\"upstream_id\":\"779999417\",\"upstream_updated_at\":\"2021-11-14T01:33:02.000000Z\",\"deleted\":false,\"payload\":{\"activeLockReason\":null,\"additions\":368,\"authorAssociation\":\"OWNER\",\"baseRefName\":\"main\",\"baseRefOid\":\"19edd0c9fc1b3bdb595a5c7d208151fe79282000\",\"body\":\"\",\"changedFiles\":18,\"closedAt\":\"2021-11-14T01:33:02Z\",\"createdAt\":\"2021-11-14T01:32:50Z\",\"databaseId\":779999417,\"deletions\":28,\"headRefName\":\"feat/passwordless-sms-auth\",\"headRefOid\":\"861fc8139cf39d684994aed188808014d7bcb9a0\",\"id\":\"PR_kwDOGX_ZcM4ufdi5\",\"isDraft\":false,\"locked\":false,\"maintainerCanModify\":false,\"mergeable\":\"UNKNOWN\",\"merged\":true,\"mergedAt\":\"2021-11-14T01:33:02Z\",\"number\":1,\"repositoryId\":427809136,\"state\":\"MERGED\",\"title\":\"Feat/passwordless sms auth\",\"totalCommentsCount\":0,\"updatedAt\":\"2021-11-14T01:33:02Z\",\"url\":\"https://github.com/REDACTED/pull/1\"}}",
  metadata: %{
    offset: 440,
    partition: 2,
    key: "7ece2bcd-3402-4093-9e2e-67dfcbdc31cd:github:pull_request:779999417",
    headers: [{"_ups.type", "str"}],
    topic: "sequin.events.compacted",
    ts: 1705520930661
  },
  acknowledger: {BroadwayKafka.Acknowledger,
   {#PID<0.267.0>, {2, "sequin.events.compacted", 2}}, %{offset: 440}},
  batcher: :default,
  batch_key: {"sequin.events.compacted", 2},
  batch_mode: :bulk,
  status: :ok
}

We can see that the Broadway.Message metadata field has data including the topic, key, and offset. The data field has our Record object as a JSON string.

Let’s decode the payload to take a closer look:

%{
  "collection_id" => "github:pull_request",
  "deleted" => false,
  "payload" => %{
    "activeLockReason" => nil,
    "additions" => 368,
    "authorAssociation" => "OWNER",
    "baseRefName" => "main",
    "baseRefOid" => "19edd0c9fc1b3bdb595a5c7d208151fe79282000",
    "body" => "",
    "changedFiles" => 18,
    "closedAt" => "2021-11-14T01:33:02Z",
    "createdAt" => "2021-11-14T01:32:50Z",
    "databaseId" => 779999417,
    "deletions" => 28,
    "headRefName" => "feat/passwordless-sms-auth",
    "headRefOid" => "861fc8139cf39d684994aed188808014d7bcb9a0",
    "id" => "PR_kwDOGX_ZcM4ufdi5",
    "isDraft" => false,
    "locked" => false,
    "maintainerCanModify" => false,
    "mergeable" => "UNKNOWN",
    "merged" => true,
    "mergedAt" => "2021-11-14T01:33:02Z",
    "number" => 1,
    "repositoryId" => 427809136,
    "state" => "MERGED",
    "title" => "Feat/passwordless sms auth",
    "totalCommentsCount" => 0,
    "updatedAt" => "2021-11-14T01:33:02Z",
    "url" => "https://github.com/REDACTED/pull/1"
  },
  "resource_id" => "7ece2bcd-3402-4093-9e2e-67dfcbdc31cd",
  "upstream_id" => "779999417",
  "upstream_updated_at" => "2021-11-14T01:33:02.000000Z"
}

We will build our Postgres upserts on this data.

Database Setup

Let’s create a migration and a schema file for our Pull Request table in Postgres.

defmodule PostgresSink.Repo.Migrations.CreatePullRequests do
  use Ecto.Migration

  def change do
    create table(:pull_requests) do
      add :upstream_id, :string, null: false
      add :repository_id, :integer, null: false
      add :number, :integer, null: false
      add :title, :string, null: false
      add :body, :string
      add :state, :string, null: false
      add :url, :string, null: false
      add :created_at, :utc_datetime_usec, null: false
      add :updated_at, :utc_datetime_usec, null: false
      add :merged_at, :utc_datetime_usec
    end
  end
end
defmodule PostgresSink.Github.PullRequest do
  use Ecto.Schema

  schema "pull_requests" do
    field :body, :string
    field :created_at, :utc_datetime_usec
    field :merged_at, :utc_datetime_usec
    field :number, :integer
    field :repository_id, :integer
    field :state, :string
    field :title, :string
    field :updated_at, :utc_datetime_usec
    field :upstream_id, :string
    field :url, :string
  end
end

This schema is entirely in your control because you own the Consumer. You choose which fields to sync, what the columns are named, and how to map datatypes. In this case, we will go with a straightforward subset of fields from the Record.

Upserts

Let’s now update our handle_message callback to upsert the incoming Kafka messages to this pull_requests table.

First, let’s decode the data and filter for pull request records:

  @impl Broadway
  def handle_message(_processor, %Message{data: data}, _context) do
    case Jason.decode(data) do
      {:ok, %{"collection_id" => "github:pull_request"} = data} ->
        // TODO: perform our upsert 
        message

      {:ok, _} ->
        // ignore other collections
        message
    end
  end

We decode the JSON string then pattern match on the collection_id field to filter for just pull requests. The handle_message callback should return the original message.

We are almost ready to perform our upsert. Before we do so, we should also check the deleted field to see if the Record is deleted. If so, we want to delete the pull request from our table instead of upsert:

  @impl Broadway
  def handle_message(_processor, %Message{data: data}, _context) do
    case Jason.decode(data) do
      {:ok, %{"collection_id" => "github:pull_request", "deleted" => deleted} = data} ->
        if deleted do
          delete_pull_request(data) // TODO: implement
        else
          upsert_pull_request(data) // TODO: implement
        end

        message

      ...
    end
  end

Sequin produces deleted Records to the sequin.records topic so that your Consumer can handle deletion events.

Finally, let’s implement our deletes and upserts.

The delete_pull_request function unpacks the upstream_id from the data, queries for the pull request from our table, and deletes it. If we don’t find the record, we return :ok because there is nothing to delete:

  defp delete_pull_request(data) do
    %{"upstream_id" => upstream_id} = data

    case Github.find_pull_request(upstream_id) do
      {:ok, pull_request} ->
        Github.delete_pull_request(pull_request)

      {:error, :not_found} ->
        :ok
    end
  end

The upsert_pull_request function has a few steps:

  1. We unpack the fields that we want to sync from data.
  2. We map these fields to our internal field names as params. This explicit business logic determines how our Consumer maps a Sequin Record to our internal database schema.
  3. We query for an existing record by upstream_id. If it exists, we update it. If it does not exist, we create it.
  defp upsert_pull_request(data) do
    %{
      "upstream_id" => upstream_id,
      "payload" => %{
        "repositoryId" => repository_id,
        "number" => number,
        "title" => title,
        "body" => body,
        "state" => state,
        "url" => url,
        "createdAt" => created_at,
        "updatedAt" => updated_at,
        "mergedAt" => merged_at
      }
    } = data

    params = %{
      upstream_id: upstream_id,
      repository_id: repository_id,
      number: number,
      title: title,
      body: body,
      state: state,
      url: url,
      created_at: created_at,
      updated_at: updated_at,
      merged_at: merged_at
    }

    case Github.find_pull_request(upstream_id) do
      {:ok, pull_request} ->
        Github.update_pull_request(pull_request, params)

      {:error, :not_found} ->
        Github.create_pull_request(params)
    end
  end
end

Conclusion

Our Consumer is ready to go to sync Pull Request records from Sequin Kafka to our database.

Because we own this Consumer in our application code, we fully control the schema, constraints, and business logic. If we want to add a column we can simply migrate our table with the new column and modify our upsert_pull_request function to extract the new field.

We can even enrich our Pull Requests with additional data like internal user_ids or vector embeddings for semantic search.

This consumer can even be modified to work with other databases like MySQL or Cassandra.

Was this page helpful?