Read more about upserting to Postgres.

Active Record is the most popular ORM for Ruby and the default ORM that ships with Ruby on Rails.

To handle Sequin records using ActiveRecord, first set up your models like this:

class Subscription < ApplicationRecord
  self.table_name = 'subscriptions'
end

class SequinRecord
  attr_reader :id, :upstream_id, :collection_id, :data, :deleted, :inserted_at, :provider, :stream_id, :sync, :updated_idx, :upstream_created_at, :upstream_updated_at

  def initialize(attributes)
    @id = attributes[:id]
    @upstream_id = attributes[:upstream_id]
    @collection_id = attributes[:collection_id]
    @data = attributes[:data]
    @deleted = attributes[:deleted]
    @inserted_at = attributes[:inserted_at]
    @provider = attributes[:provider]
    @stream_id = attributes[:stream_id]
    @sync = attributes[:sync]
    @updated_idx = attributes[:updated_idx]
    @upstream_created_at = attributes[:upstream_created_at]
    @upstream_updated_at = attributes[:upstream_updated_at]
  end
end

Then you can upsert records like this:

class StripeSyncService
  def self.process_record(record)
    case record.collection_id
    when "stripe:subscription"
      attributes = {
        id: record.id,
        upstream_id: record.upstream_id,
        stripe_customer_id: record.data['stripe_customer_id'],
        plan_id: record.data['plan_id'],
        quantity: record.data['quantity'],
        internal_id: record.sync["external_id"],
        updated_idx: record.updated_idx,
        upstream_created_at: record.upstream_created_at,
        upstream_updated_at: record.upstream_updated_at
      }
      # Upsert the record
      Subscription.upsert(attributes, unique_by: :id)
    when "stripe:customer"
      # handle upserts to other collections/tables
    end
  end
end

Was this page helpful?