自学内容网 自学内容网

基于gorm.io/sharding分表中间件使用案例

项目背景

项目中需要用到mysql的分表场景,调研了一些常用的分库分表中间件,比如,mycat,小米的Gaea,这两个中间件太重了,学习成本较大,另外mycat不是go写的。我们需要一个轻量级的go版本的分表中间件。所以,把目光放在了如下这个开源组件上。go-gorm/sharding: High performance table sharding plugin for Gorm. (github.com)icon-default.png?t=O83Ahttps://github.com/go-gorm/sharding

案例

自定义分表函数以及主键生成自定义函数。该案例仅用于测试,分表的逻辑通常不会基于时间进行分表。测试中使用的主键生成方法也仅仅是用于测试,实际项目中并不推荐使用,该方式对于高并发场景并不友好,实际场景中使用预先生成的方式或是其他的分布式id生成器更好。

事实上,这个库有一些默认的主键id生成方式。

const (
// Use Snowflake primary key generator
PKSnowflake = iota
// Use PostgreSQL sequence primary key generator
PKPGSequence
// Use MySQL sequence primary key generator
PKMySQLSequence
// Use custom primary key generator
PKCustom
)

但是,在这个自定义分表逻辑场景中,使用默认的主键id生成方式,出现了bug。

 比如,使用KMySQLSequence这种主键生成方式,第二次执行插入sql操作就报了Error 1062 (23000): Duplicate entry '2' for key 'orders_2025.PRIMARY'这个错误。我大概看了一下导致错误的原因在于如下这个方法,这个方法的意思是主键id获取数据库中最后一次插入操作所生成的自增ID值,这就导致,第二次执行插入sql操作时,主键取的是已经存在的,导致报了上面的错误。

初步分析下来是这个原因,目前尚未验证这个错误原因。

func (s *Sharding) genMySQLSequenceKey(tableName string, index int64) int64 {
var id int64
err := s.DB.Exec("UPDATE `" + mySQLSeqName(tableName) + "` SET id = LAST_INSERT_ID(id + 1)").Error
if err != nil {
panic(err)
}
err = s.DB.Raw("SELECT LAST_INSERT_ID()").Scan(&id).Error
if err != nil {
panic(err)
}

return id
}

 使用PKSnowflake这种主键生成方式,导致panic。

 初步分析下来的原因是这个index源码中默认设定的最大值是1024,但是,在我们这个场景中,分表的后缀超过了1024,来到了2024,2025等。

同样,该错误原因尚未深入分析,可能不是这个原因,尚待进一步分析。

func (s *Sharding) genSnowflakeKey(index int64) int64 {
return s.snowflakeNodes[index].Generate().Int64()
}

测试案例如下: 

package test

import (
"fmt"
"testing"
"time"

"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/sharding"
)

var globalDB *gorm.DB

type Order struct {
ID        int64  `gorm:"primaryKey"`
OrderId   string `gorm:"sharding:order_id"` // 指明 OrderId 是分片键
UserID    int64
ProductID int64
OrderDate time.Time
}

// 自定义 ShardingAlgorithm
func customShardingAlgorithm(value any) (suffix string, err error) {
if orderId, ok := value.(string); ok {
// 截取字符串,截取前8位,获取年份
orderId = orderId[0:8]
orderDate, err := time.Parse("20060102", orderId)
if err != nil {
return "", fmt.Errorf("invalid order_date")
}
year := orderDate.Year()
return fmt.Sprintf("_%d", year), nil
}
return "", fmt.Errorf("invalid order_date")
}

// customePrimaryKeyGeneratorFn 自定义主键生成函数
func customePrimaryKeyGeneratorFn(tableIdx int64) int64 {
var id int64
seqTableName := "gorm_sharding_orders_id_seq" // 序列表名
db := globalDB
// 使用事务来确保主键生成的原子性
tx := db.Begin()
defer func() {
if r := recover(); r != nil {
tx.Rollback()
}
}()

// 锁定序列表以确保并发安全(可选,取决于你的 MySQL 配置和并发级别)
// 注意:在某些 MySQL 版本和配置中,使用 LOCK TABLES 可能不是最佳选择
// 这里仅作为示例,实际应用中可能需要更精细的并发控制策略
tx.Exec("LOCK TABLES " + seqTableName + " WRITE")

// 查询当前的最大 ID
tx.Raw("SELECT id FROM " + seqTableName + " ORDER BY id DESC LIMIT 1").Scan(&id)

// 更新序列表(这里直接递增 1,实际应用中可能需要更复杂的逻辑)
newID := id + 1
tx.Exec("INSERT INTO "+seqTableName+" (id) VALUES (?)", newID) // 这里假设序列表允许插入任意 ID,实际应用中可能需要其他机制来确保 ID 的唯一性和连续性

// 释放锁定
tx.Exec("UNLOCK TABLES")

// 提交事务
if err := tx.Commit().Error; err != nil {
panic(err) // 实际应用中应该使用更优雅的错误处理机制
}

return newID
}

// Test_Gorm_Sharding 用于测试 Gorm Sharding 插件
func Test_Gorm_Sharding(t *testing.T) {
// 连接到 MySQL 数据库
dsn := "dev:dreame@2020@tcp(10.10.37.108:13306)/sharding_db2?charset=utf8mb4&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.New(mysql.Config{
DSN: dsn,
}), &gorm.Config{})
if err != nil {
panic("failed to connect database")
}
globalDB = db

// 配置 Gorm Sharding 中间件,使用自定义的分片算法
middleware := sharding.Register(sharding.Config{
ShardingKey:           "order_id",
ShardingAlgorithm:     customShardingAlgorithm, // 使用自定义的分片算法
PrimaryKeyGenerator:   sharding.PKCustom,
PrimaryKeyGeneratorFn: customePrimaryKeyGeneratorFn,
}, "orders") // 逻辑表名为 "orders"
db.Use(middleware)

// 创建 Order 表
err = db.Exec(`CREATE TABLE IF NOT EXISTS orders_2024 (
id BIGINT PRIMARY KEY,
order_id VARCHAR(50),
user_id INT,
product_id INT,
order_date DATETIME
)`).Error
if err != nil {
panic("failed to create table")
}
err = db.Exec(`CREATE TABLE IF NOT EXISTS orders_2025 (
id BIGINT PRIMARY KEY,
order_id VARCHAR(50),
user_id INT,
product_id INT,
order_date DATETIME
)`).Error
if err != nil {
panic("failed to create table")
}

// 示例:插入订单数据
order := Order{
OrderId:   "20240101ORDER0001",
UserID:    1,
ProductID: 100,
OrderDate: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
}
err = db.Create(&order).Error
if err != nil {
fmt.Println("Error creating order:", err)
}
order2 := Order{
OrderId:   "20250101ORDER0001",
UserID:    1,
ProductID: 100,
OrderDate: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),
}
err = db.Create(&order2).Error
if err != nil {
fmt.Println("Error creating order:", err)
}

// 查询示例
var orders []Order
err = db.Model(&Order{}).Where("order_id=?", "20240101ORDER0001").Find(&orders).Error
if err != nil {
fmt.Println("Error querying orders:", err)
}
fmt.Printf("Selected orders: %#v\n", orders)

// 更新示例
err = db.Model(&Order{}).Where("order_id=? and id=?", "20240101ORDER0001", int64(14)).Update("product_id", 102).Error
if err != nil {
fmt.Println("Error updating order:", err)
}

// 再次查询示例,验证更新是否生效
err = db.Model(&Order{}).Where("order_id=?", "20240101ORDER0001").Find(&orders).Error
if err != nil {
fmt.Println("Error querying orders:", err)
}
fmt.Printf("Selected orders: %#v\n", orders)

// 删除示例
err = db.Model(&Order{}).Where("order_id=? and id =?", "20240101ORDER0001", int64(16)).Delete(&Order{}).Error
if err != nil {
fmt.Println("Error deleting order:", err)
}

// 再次查询示例,验证删除是否生效
err = db.Model(&Order{}).Where("order_id=?", "20240101ORDER0001").Find(&orders).Error
if err != nil {
fmt.Println("Error querying orders:", err)
}
fmt.Printf("Selected orders: %#v\n", orders)
}

遗留问题 

1.上文中提到了两种默认主键生成方式报bug问题需要进一步定位以确定根因。

2.该组件的增删改查仅支持查询条件中包含分表主键的场景,对于,不包含主键需要全表扫描的场景并不支持,当然,全表扫描最优解并不是通过mysql的能力解决,最好是借助其他的方案,比如,通过es的方案。但是,对于,我们目前的场景中依然存在上述需求,所以,尚需考虑该场景如何解决。

3.IN查询也不支持。

本质上只支持,根据分表键路由到对应的表进行单表查询。


原文地址:https://blog.csdn.net/ezreal_pan/article/details/142461177

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!