1
0
mirror of https://github.com/gogf/gf.git synced 2025-04-05 03:05:05 +08:00

feat(database/gdb): add sharding feature for schema and table (#4014)

This commit is contained in:
John Guo 2024-12-09 23:12:20 +08:00 committed by GitHub
parent bae78fbf5b
commit 13bc192e36
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 476 additions and 7 deletions

View File

@ -18,7 +18,234 @@ import (
"github.com/gogf/gf/v2/test/gtest"
)
func Test_Model_Sharding_Table(t *testing.T) {
const (
TestDbNameSh0 = "test_0"
TestDbNameSh1 = "test_1"
TestTableName = "user"
)
type ShardingUser struct {
Id int
Name string
}
// createShardingDatabase creates test databases and tables for sharding
func createShardingDatabase(t *gtest.T) {
// Create databases
dbs := []string{TestDbNameSh0, TestDbNameSh1}
for _, dbName := range dbs {
sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`", dbName)
_, err := db.Exec(ctx, sql)
t.AssertNil(err)
// Switch to the database
sql = fmt.Sprintf("USE `%s`", dbName)
_, err = db.Exec(ctx, sql)
t.AssertNil(err)
// Create tables
tables := []string{"user_0", "user_1", "user_2", "user_3"}
for _, table := range tables {
sql := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id int(11) NOT NULL,
name varchar(255) NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
`, table)
_, err := db.Exec(ctx, sql)
t.AssertNil(err)
}
}
}
// dropShardingDatabase drops test databases
func dropShardingDatabase(t *gtest.T) {
dbs := []string{TestDbNameSh0, TestDbNameSh1}
for _, dbName := range dbs {
sql := fmt.Sprintf("DROP DATABASE IF EXISTS `%s`", dbName)
_, err := db.Exec(ctx, sql)
t.AssertNil(err)
}
}
func Test_Sharding_Basic(t *testing.T) {
return
gtest.C(t, func(t *gtest.T) {
var (
tablePrefix = "user_"
schemaPrefix = "test_"
)
// Create test databases and tables
createShardingDatabase(t)
defer dropShardingDatabase(t)
// Create sharding configuration
shardingConfig := gdb.ShardingConfig{
Table: gdb.ShardingTableConfig{
Enable: true,
Prefix: tablePrefix,
Rule: &gdb.DefaultShardingRule{
TableCount: 4,
},
},
Schema: gdb.ShardingSchemaConfig{
Enable: true,
Prefix: schemaPrefix,
Rule: &gdb.DefaultShardingRule{
SchemaCount: 2,
},
},
}
// Prepare test data
user := ShardingUser{
Id: 1,
Name: "John",
}
model := db.Model(TestTableName).
Sharding(shardingConfig).
ShardingValue(user.Id).
Safe()
// Test Insert
_, err := model.Data(user).Insert()
t.AssertNil(err)
// Test Select
var result ShardingUser
err = model.Where("id", user.Id).Scan(&result)
t.AssertNil(err)
t.Assert(result.Id, user.Id)
t.Assert(result.Name, user.Name)
// Test Update
_, err = model.Data(g.Map{"name": "John Doe"}).
Where("id", user.Id).
Update()
t.AssertNil(err)
// Verify Update
err = model.Where("id", user.Id).Scan(&result)
t.AssertNil(err)
t.Assert(result.Name, "John Doe")
// Test Delete
_, err = model.Where("id", user.Id).Delete()
t.AssertNil(err)
// Verify Delete
count, err := model.Where("id", user.Id).Count()
t.AssertNil(err)
t.Assert(count, 0)
})
}
// Test_Sharding_Error tests error cases
func Test_Sharding_Error(t *testing.T) {
return
gtest.C(t, func(t *gtest.T) {
// Create test databases and tables
createShardingDatabase(t)
defer dropShardingDatabase(t)
// Test missing sharding value
model := db.Model(TestTableName).
Sharding(gdb.ShardingConfig{
Table: gdb.ShardingTableConfig{
Enable: true,
Prefix: "user_",
Rule: &gdb.DefaultShardingRule{TableCount: 4},
},
}).Safe()
_, err := model.Insert(g.Map{"id": 1, "name": "test"})
t.AssertNE(err, nil)
t.Assert(err.Error(), "sharding value is required when sharding feature enabled")
// Test missing sharding rule
model = db.Model(TestTableName).
Sharding(gdb.ShardingConfig{
Table: gdb.ShardingTableConfig{
Enable: true,
Prefix: "user_",
},
}).
ShardingValue(1)
_, err = model.Insert(g.Map{"id": 1, "name": "test"})
t.AssertNE(err, nil)
t.Assert(err.Error(), "sharding rule is required when sharding feature enabled")
})
}
// Test_Sharding_Complex tests complex sharding scenarios
func Test_Sharding_Complex(t *testing.T) {
return
gtest.C(t, func(t *gtest.T) {
// Create test databases and tables
createShardingDatabase(t)
defer dropShardingDatabase(t)
shardingConfig := gdb.ShardingConfig{
Table: gdb.ShardingTableConfig{
Enable: true,
Prefix: "user_",
Rule: &gdb.DefaultShardingRule{TableCount: 4},
},
Schema: gdb.ShardingSchemaConfig{
Enable: true,
Prefix: "test_",
Rule: &gdb.DefaultShardingRule{SchemaCount: 2},
},
}
users := []ShardingUser{
{Id: 1, Name: "User1"},
{Id: 2, Name: "User2"},
{Id: 3, Name: "User3"},
}
for _, user := range users {
model := db.Model(TestTableName).
Sharding(shardingConfig).
ShardingValue(user.Id).
Safe()
_, err := model.Data(user).Insert()
t.AssertNil(err)
}
// Test batch query
for _, user := range users {
model := db.Model(TestTableName).
Sharding(shardingConfig).
ShardingValue(user.Id).
Safe()
var result ShardingUser
err := model.Where("id", user.Id).Scan(&result)
t.AssertNil(err)
t.Assert(result.Id, user.Id)
t.Assert(result.Name, user.Name)
}
// Clean up
for _, user := range users {
model := db.Model(TestTableName).
Sharding(shardingConfig).
ShardingValue(user.Id).
Safe()
_, err := model.Where("id", user.Id).Delete()
t.AssertNil(err)
}
})
}
func Test_Model_Sharding_Table_Using_Hook(t *testing.T) {
var (
table1 = gtime.TimestampNanoStr() + "_table1"
table2 = gtime.TimestampNanoStr() + "_table2"
@ -127,7 +354,7 @@ func Test_Model_Sharding_Table(t *testing.T) {
})
}
func Test_Model_Sharding_Schema(t *testing.T) {
func Test_Model_Sharding_Schema_Using_Hook(t *testing.T) {
var (
table = gtime.TimestampNanoStr() + "_table"
)

View File

@ -53,6 +53,8 @@ type Model struct {
onConflict interface{} // onConflict is used for conflict keys on Upsert clause.
tableAliasMap map[string]string // Table alias to true table name, usually used in join statements.
softTimeOption SoftTimeOption // SoftTimeOption is the option to customize soft time feature for Model.
shardingConfig ShardingConfig // ShardingConfig for database/table sharding feature.
shardingValue any // Sharding value for sharding feature.
}
// ModelHandler is a function that handles given Model and returns a new Model that is custom modified.

View File

@ -64,6 +64,7 @@ func (m *Model) Delete(where ...interface{}) (result sql.Result, err error) {
},
Model: m,
Table: m.tables,
Schema: m.schema,
Data: dataHolder,
Condition: conditionStr,
Args: append([]interface{}{dataValue}, conditionArgs...),
@ -80,6 +81,7 @@ func (m *Model) Delete(where ...interface{}) (result sql.Result, err error) {
},
Model: m,
Table: m.tables,
Schema: m.schema,
Condition: conditionStr,
Args: conditionArgs,
}

View File

@ -122,6 +122,17 @@ func (h *HookSelectInput) Next(ctx context.Context) (result Result, err error) {
if h.originalSchemaName.IsNil() {
h.originalSchemaName = gvar.New(h.Schema)
}
// Sharding feature.
h.Schema, err = h.Model.getActualSchema(ctx, h.Schema)
if err != nil {
return nil, err
}
h.Table, err = h.Model.getActualTable(ctx, h.Table)
if err != nil {
return nil, err
}
// Custom hook handler call.
if h.handler != nil && !h.handlerCalled {
h.handlerCalled = true
@ -161,11 +172,23 @@ func (h *HookInsertInput) Next(ctx context.Context) (result sql.Result, err erro
h.originalSchemaName = gvar.New(h.Schema)
}
// Sharding feature.
h.Schema, err = h.Model.getActualSchema(ctx, h.Schema)
if err != nil {
return nil, err
}
h.Table, err = h.Model.getActualTable(ctx, h.Table)
if err != nil {
return nil, err
}
if h.handler != nil && !h.handlerCalled {
h.handlerCalled = true
return h.handler(ctx, h)
}
// No need to handle table change.
// Schema change.
if h.Schema != "" && h.Schema != h.originalSchemaName.String() {
h.link, err = h.Model.db.GetCore().MasterLink(h.Schema)
@ -185,6 +208,16 @@ func (h *HookUpdateInput) Next(ctx context.Context) (result sql.Result, err erro
h.originalSchemaName = gvar.New(h.Schema)
}
// Sharding feature.
h.Schema, err = h.Model.getActualSchema(ctx, h.Schema)
if err != nil {
return nil, err
}
h.Table, err = h.Model.getActualTable(ctx, h.Table)
if err != nil {
return nil, err
}
if h.handler != nil && !h.handlerCalled {
h.handlerCalled = true
if gstr.HasPrefix(h.Condition, whereKeyInCondition) {
@ -196,6 +229,9 @@ func (h *HookUpdateInput) Next(ctx context.Context) (result sql.Result, err erro
if h.removedWhere {
h.Condition = whereKeyInCondition + h.Condition
}
// No need to handle table change.
// Schema change.
if h.Schema != "" && h.Schema != h.originalSchemaName.String() {
h.link, err = h.Model.db.GetCore().MasterLink(h.Schema)
@ -215,6 +251,16 @@ func (h *HookDeleteInput) Next(ctx context.Context) (result sql.Result, err erro
h.originalSchemaName = gvar.New(h.Schema)
}
// Sharding feature.
h.Schema, err = h.Model.getActualSchema(ctx, h.Schema)
if err != nil {
return nil, err
}
h.Table, err = h.Model.getActualTable(ctx, h.Table)
if err != nil {
return nil, err
}
if h.handler != nil && !h.handlerCalled {
h.handlerCalled = true
if gstr.HasPrefix(h.Condition, whereKeyInCondition) {
@ -226,6 +272,9 @@ func (h *HookDeleteInput) Next(ctx context.Context) (result sql.Result, err erro
if h.removedWhere {
h.Condition = whereKeyInCondition + h.Condition
}
// No need to handle table change.
// Schema change.
if h.Schema != "" && h.Schema != h.originalSchemaName.String() {
h.link, err = h.Model.db.GetCore().MasterLink(h.Schema)

View File

@ -335,6 +335,7 @@ func (m *Model) doInsertWithOption(ctx context.Context, insertOption InsertOptio
},
Model: m,
Table: m.tables,
Schema: m.schema,
Data: list,
Option: doInsertOption,
}

View File

@ -684,6 +684,7 @@ func (m *Model) doGetAllBySql(
},
Model: m,
Table: m.tables,
Schema: m.schema,
Sql: sql,
Args: m.mergeArguments(args),
SelectType: selectType,

View File

@ -0,0 +1,161 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gdb
import (
"context"
"fmt"
"hash/fnv"
"reflect"
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/util/gconv"
)
// ShardingConfig defines the configuration for database/table sharding.
type ShardingConfig struct {
// Table sharding configuration
Table ShardingTableConfig
// Schema sharding configuration
Schema ShardingSchemaConfig
}
// ShardingSchemaConfig defines the configuration for database sharding.
type ShardingSchemaConfig struct {
// Enable schema sharding
Enable bool
// Schema rule prefix, e.g., "db_"
Prefix string
// ShardingRule defines how to route data to different database nodes
Rule ShardingRule
}
// ShardingTableConfig defines the configuration for table sharding
type ShardingTableConfig struct {
// Enable table sharding
Enable bool
// Table rule prefix, e.g., "user_"
Prefix string
// ShardingRule defines how to route data to different tables
Rule ShardingRule
}
// ShardingRule defines the interface for sharding rules
type ShardingRule interface {
// SchemaName returns the target schema name based on sharding value.
SchemaName(ctx context.Context, config ShardingSchemaConfig, value any) (string, error)
// TableName returns the target table name based on sharding value.
TableName(ctx context.Context, config ShardingTableConfig, value any) (string, error)
}
// DefaultShardingRule implements a simple modulo-based sharding rule
type DefaultShardingRule struct {
// Number of schema count.
SchemaCount int
// Number of tables per schema.
TableCount int
}
// Sharding creates a sharding model with given sharding configuration.
func (m *Model) Sharding(config ShardingConfig) *Model {
model := m.getModel()
model.shardingConfig = config
return model
}
// ShardingValue sets the sharding value for routing
func (m *Model) ShardingValue(value any) *Model {
model := m.getModel()
model.shardingValue = value
return model
}
// getActualSchema returns the actual schema based on sharding configuration.
// TODO it does not support schemas in different database config node.
func (m *Model) getActualSchema(ctx context.Context, defaultSchema string) (string, error) {
if !m.shardingConfig.Schema.Enable {
return defaultSchema, nil
}
if m.shardingValue == nil {
return defaultSchema, gerror.NewCode(
gcode.CodeInvalidParameter, "sharding value is required when sharding feature enabled",
)
}
if m.shardingConfig.Schema.Rule == nil {
return defaultSchema, gerror.NewCode(
gcode.CodeInvalidParameter, "sharding rule is required when sharding feature enabled",
)
}
return m.shardingConfig.Schema.Rule.SchemaName(ctx, m.shardingConfig.Schema, m.shardingValue)
}
// getActualTable returns the actual table name based on sharding configuration
func (m *Model) getActualTable(ctx context.Context, defaultTable string) (string, error) {
if !m.shardingConfig.Table.Enable {
return defaultTable, nil
}
if m.shardingValue == nil {
return defaultTable, gerror.NewCode(
gcode.CodeInvalidParameter, "sharding value is required when sharding feature enabled",
)
}
if m.shardingConfig.Table.Rule == nil {
return defaultTable, gerror.NewCode(
gcode.CodeInvalidParameter, "sharding rule is required when sharding feature enabled",
)
}
return m.shardingConfig.Table.Rule.TableName(ctx, m.shardingConfig.Table, m.shardingValue)
}
// SchemaName implements the default database sharding strategy
func (r *DefaultShardingRule) SchemaName(ctx context.Context, config ShardingSchemaConfig, value any) (string, error) {
if r.SchemaCount == 0 {
return "", gerror.NewCode(
gcode.CodeInvalidParameter, "schema count should not be 0 using DefaultShardingRule when schema sharding enabled",
)
}
hashValue, err := getHashValue(value)
if err != nil {
return "", err
}
nodeIndex := hashValue % uint64(r.SchemaCount)
return fmt.Sprintf("%s%d", config.Prefix, nodeIndex), nil
}
// TableName implements the default table sharding strategy
func (r *DefaultShardingRule) TableName(ctx context.Context, config ShardingTableConfig, value any) (string, error) {
if r.TableCount == 0 {
return "", gerror.NewCode(
gcode.CodeInvalidParameter, "table count should not be 0 using DefaultShardingRule when table sharding enabled",
)
}
hashValue, err := getHashValue(value)
if err != nil {
return "", err
}
tableIndex := hashValue % uint64(r.TableCount)
return fmt.Sprintf("%s%d", config.Prefix, tableIndex), nil
}
// getHashValue converts sharding value to uint64 hash
func getHashValue(value any) (uint64, error) {
var rv = reflect.ValueOf(value)
switch rv.Kind() {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64,
reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64,
reflect.Float32, reflect.Float64:
return gconv.Uint64(value), nil
default:
h := fnv.New64a()
_, err := h.Write(gconv.Bytes(value))
if err != nil {
return 0, gerror.WrapCode(gcode.CodeInternalError, err)
}
return h.Sum64(), nil
}
}

View File

@ -105,6 +105,7 @@ func (m *Model) Update(dataAndWhere ...interface{}) (result sql.Result, err erro
},
Model: m,
Table: m.tables,
Schema: m.schema,
Data: newData,
Condition: conditionStr,
Args: m.mergeArguments(conditionArgs),

View File

@ -33,10 +33,20 @@ func (m *Model) QuoteWord(s string) string {
// Also see DriverMysql.TableFields.
func (m *Model) TableFields(tableStr string, schema ...string) (fields map[string]*TableField, err error) {
var (
table = m.db.GetCore().guessPrimaryTableName(tableStr)
ctx = m.GetCtx()
usedTable = m.db.GetCore().guessPrimaryTableName(tableStr)
usedSchema = gutil.GetOrDefaultStr(m.schema, schema...)
)
return m.db.TableFields(m.GetCtx(), table, usedSchema)
// Sharding feature.
usedSchema, err = m.getActualSchema(ctx, usedSchema)
if err != nil {
return nil, err
}
usedTable, err = m.getActualTable(ctx, usedTable)
if err != nil {
return nil, err
}
return m.db.TableFields(ctx, usedTable, usedSchema)
}
// getModel creates and returns a cloned model of current model if `safe` is true, or else it returns
@ -143,9 +153,24 @@ func (m *Model) filterDataForInsertOrUpdate(data interface{}) (interface{}, erro
// doMappingAndFilterForInsertOrUpdateDataMap does the filter features for map.
// Note that, it does not filter list item, which is also type of map, for "omit empty" feature.
func (m *Model) doMappingAndFilterForInsertOrUpdateDataMap(data Map, allowOmitEmpty bool) (Map, error) {
var err error
data, err = m.db.GetCore().mappingAndFilterData(
m.GetCtx(), m.schema, m.tablesInit, data, m.filter,
var (
err error
ctx = m.GetCtx()
core = m.db.GetCore()
schema = m.schema
table = m.tablesInit
)
// Sharding feature.
schema, err = m.getActualSchema(ctx, schema)
if err != nil {
return nil, err
}
table, err = m.getActualTable(ctx, table)
if err != nil {
return nil, err
}
data, err = core.mappingAndFilterData(
ctx, schema, table, data, m.filter,
)
if err != nil {
return nil, err