Read more about upserting to Postgres.

According to its website, GORM is a database wrapper and ORM that makes it easy to work with databases in Go.

Here’s an example of using GORM to process and upsert Sequin records:

package main

import (
	"log"
	"time"

	"gorm.io/driver/postgres"
	"gorm.io/gorm"
	"gorm.io/gorm/clause"
)

type SequinRecordSync struct {
	Id         string                 `json:"id"`
	ExternalId string                 `json:"external_id"`
	Metadata   map[string]interface{} `json:"metadata"`
}

// A SequinRecord is what Sequin's consume API returns
type SequinRecord struct {
	CollectionId      string                 `json:"collection_id"`
	Data              map[string]interface{} `json:"data"`
	Deleted           bool                   `json:"deleted"`
	InsertedAt        time.Time              `json:"inserted_at"`
	Provider          string                 `json:"provider"`
	StreamId          string                 `json:"stream_id"`
	Sync              []SequinRecordSync     `json:"sync"`
	UpdatedIdx        int64                  `json:"updated_idx"`
	UpstreamId        string                 `json:"upstream_id"`
	UpstreamCreatedAt time.Time              `json:"upstream_created_at"`
	UpstreamUpdatedAt time.Time              `json:"upstream_updated_at"`
}

// This is the struct representing your GORM-managed database table
type Subscription struct {
	Id                string `gorm:"primaryKey"`
	UpstreamId        string
	StripeCustomerId  string
	PlanId            string
	Quantity          int
	InternalId        string
	UpdatedAt         time.Time
	CreatedAt         time.Time
	UpdatedIdx        int64
	UpstreamUpdatedAt time.Time
	UpstreamCreatedAt time.Time
}

func upsertRecord(db *gorm.DB, record SequinRecord) error {
	switch record.CollectionId {
	case "stripe:subscription":
		return upsertSubscription(db, record)
		// TODO: Handle other collections here
	}
  return nil
}

func upsertSubscription(db *gorm.DB, record SequinRecord) error {
	subscription := Subscription{
		Id:                record.UpstreamId,
		StripeCustomerId:  record.Data["stripe_customer_id"].(string),
		PlanId:            record.Data["plan_id"].(string),
		Quantity:          record.Data["quantity"].(int),
		InternalId:        record.UpstreamId,
		UpdatedIdx:        record.UpdatedIdx,
		UpstreamCreatedAt: record.UpstreamCreatedAt,
		UpstreamUpdatedAt: record.UpstreamUpdatedAt,
		CreatedAt:         time.Now(),
		UpdatedAt:         time.Now(),
	}

	// Perform an upsert operation
	return db.Clauses(clause.OnConflict{
		Columns:   []clause.Column{{Name: "id"}}, // Column which acts as the conflict target
		DoUpdates: clause.AssignmentColumns([]string{"stripe_customer_id", "plan_id", "quantity", "internal_id", "updated_idx", "upstream_created_at", "upstream_updated_at", "updated_at"}),
	}).Create(&subscription).Error
}

func main() {
	dsn := "host=localhost user=<your-pg-user> password=<your-pg-password> dbname=<your-db-name> port=5432 sslmode=disable"
	db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
	if err != nil {
		log.Fatalf("failed to connect database: %v", err)
	}

	// Un-comment this line to create the table in the database
	// db.AutoMigrate(&Subscription{})

	record := SequinRecord{
		UpstreamId: "sub_7f4a2fd3b5c8",
		Data: map[string]interface{}{
			"stripe_customer_id": "cus_4a5b6c7d8e9f",
			"plan_id":            "plan_9d8c7b6a5e4f",
			"quantity":           10,
		},
		UpdatedIdx:        1,
		UpstreamCreatedAt: time.Now(),
		UpstreamUpdatedAt: time.Now(),
	}

	if err := upsertSubscription(db, record); err != nil {
		log.Fatalf("failed to upsert subscription: %v", err)
	}
}