Read more about upserting to Postgres. According to its website, GORM is a database wrapper and ORM that makes it easy to work with databases in Go. Here’s an example of using GORM to process and upsert Sequin records:Documentation Index
Fetch the complete documentation index at: https://sequin.io/docs/llms.txt
Use this file to discover all available pages before exploring further.
package main
import (
"log"
"time"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
type SequinRecordSync struct {
Id string `json:"id"`
ExternalId string `json:"external_id"`
Metadata map[string]interface{} `json:"metadata"`
}
// A SequinRecord is what Sequin's consume API returns
type SequinRecord struct {
CollectionId string `json:"collection_id"`
Data map[string]interface{} `json:"data"`
Deleted bool `json:"deleted"`
InsertedAt time.Time `json:"inserted_at"`
Provider string `json:"provider"`
StreamId string `json:"stream_id"`
Sync []SequinRecordSync `json:"sync"`
UpdatedIdx int64 `json:"updated_idx"`
UpstreamId string `json:"upstream_id"`
UpstreamCreatedAt time.Time `json:"upstream_created_at"`
UpstreamUpdatedAt time.Time `json:"upstream_updated_at"`
}
// This is the struct representing your GORM-managed database table
type Subscription struct {
Id string `gorm:"primaryKey"`
UpstreamId string
StripeCustomerId string
PlanId string
Quantity int
InternalId string
UpdatedAt time.Time
CreatedAt time.Time
UpdatedIdx int64
UpstreamUpdatedAt time.Time
UpstreamCreatedAt time.Time
}
func upsertRecord(db *gorm.DB, record SequinRecord) error {
switch record.CollectionId {
case "stripe:subscription":
return upsertSubscription(db, record)
// TODO: Handle other collections here
}
return nil
}
func upsertSubscription(db *gorm.DB, record SequinRecord) error {
subscription := Subscription{
Id: record.UpstreamId,
StripeCustomerId: record.Data["stripe_customer_id"].(string),
PlanId: record.Data["plan_id"].(string),
Quantity: record.Data["quantity"].(int),
InternalId: record.UpstreamId,
UpdatedIdx: record.UpdatedIdx,
UpstreamCreatedAt: record.UpstreamCreatedAt,
UpstreamUpdatedAt: record.UpstreamUpdatedAt,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
// Perform an upsert operation
return db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}}, // Column which acts as the conflict target
DoUpdates: clause.AssignmentColumns([]string{"stripe_customer_id", "plan_id", "quantity", "internal_id", "updated_idx", "upstream_created_at", "upstream_updated_at", "updated_at"}),
}).Create(&subscription).Error
}
func main() {
dsn := "host=localhost user=<your-pg-user> password=<your-pg-password> dbname=<your-db-name> port=5432 sslmode=disable"
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
if err != nil {
log.Fatalf("failed to connect database: %v", err)
}
// Un-comment this line to create the table in the database
// db.AutoMigrate(&Subscription{})
record := SequinRecord{
UpstreamId: "sub_7f4a2fd3b5c8",
Data: map[string]interface{}{
"stripe_customer_id": "cus_4a5b6c7d8e9f",
"plan_id": "plan_9d8c7b6a5e4f",
"quantity": 10,
},
UpdatedIdx: 1,
UpstreamCreatedAt: time.Now(),
UpstreamUpdatedAt: time.Now(),
}
if err := upsertSubscription(db, record); err != nil {
log.Fatalf("failed to upsert subscription: %v", err)
}
}

