Upsert to Postgres
Upsert to Postgres (Go)
One of the most popular destinations for API data is Postgres. Sequin makes it easy to sync API data to Postgres so you can read your data at rest.
Read more about upserting to Postgres.
According to its website, GORM is a database wrapper and ORM that makes it easy to work with databases in Go.
Here’s an example of using GORM to process and upsert Sequin records:
package main
import (
"log"
"time"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
type SequinRecordSync struct {
Id string `json:"id"`
ExternalId string `json:"external_id"`
Metadata map[string]interface{} `json:"metadata"`
}
// A SequinRecord is what Sequin's consume API returns
type SequinRecord struct {
CollectionId string `json:"collection_id"`
Data map[string]interface{} `json:"data"`
Deleted bool `json:"deleted"`
InsertedAt time.Time `json:"inserted_at"`
Provider string `json:"provider"`
StreamId string `json:"stream_id"`
Sync []SequinRecordSync `json:"sync"`
UpdatedIdx int64 `json:"updated_idx"`
UpstreamId string `json:"upstream_id"`
UpstreamCreatedAt time.Time `json:"upstream_created_at"`
UpstreamUpdatedAt time.Time `json:"upstream_updated_at"`
}
// This is the struct representing your GORM-managed database table
type Subscription struct {
Id string `gorm:"primaryKey"`
UpstreamId string
StripeCustomerId string
PlanId string
Quantity int
InternalId string
UpdatedAt time.Time
CreatedAt time.Time
UpdatedIdx int64
UpstreamUpdatedAt time.Time
UpstreamCreatedAt time.Time
}
func upsertRecord(db *gorm.DB, record SequinRecord) error {
switch record.CollectionId {
case "stripe:subscription":
return upsertSubscription(db, record)
// TODO: Handle other collections here
}
return nil
}
func upsertSubscription(db *gorm.DB, record SequinRecord) error {
subscription := Subscription{
Id: record.UpstreamId,
StripeCustomerId: record.Data["stripe_customer_id"].(string),
PlanId: record.Data["plan_id"].(string),
Quantity: record.Data["quantity"].(int),
InternalId: record.UpstreamId,
UpdatedIdx: record.UpdatedIdx,
UpstreamCreatedAt: record.UpstreamCreatedAt,
UpstreamUpdatedAt: record.UpstreamUpdatedAt,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
// Perform an upsert operation
return db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}}, // Column which acts as the conflict target
DoUpdates: clause.AssignmentColumns([]string{"stripe_customer_id", "plan_id", "quantity", "internal_id", "updated_idx", "upstream_created_at", "upstream_updated_at", "updated_at"}),
}).Create(&subscription).Error
}
func main() {
dsn := "host=localhost user=<your-pg-user> password=<your-pg-password> dbname=<your-db-name> port=5432 sslmode=disable"
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
if err != nil {
log.Fatalf("failed to connect database: %v", err)
}
// Un-comment this line to create the table in the database
// db.AutoMigrate(&Subscription{})
record := SequinRecord{
UpstreamId: "sub_7f4a2fd3b5c8",
Data: map[string]interface{}{
"stripe_customer_id": "cus_4a5b6c7d8e9f",
"plan_id": "plan_9d8c7b6a5e4f",
"quantity": 10,
},
UpdatedIdx: 1,
UpstreamCreatedAt: time.Now(),
UpstreamUpdatedAt: time.Now(),
}
if err := upsertSubscription(db, record); err != nil {
log.Fatalf("failed to upsert subscription: %v", err)
}
}
Was this page helpful?