1
0
mirror of https://github.com/gogf/gf.git synced 2025-04-05 03:05:05 +08:00

add sharding feature for package gdb

This commit is contained in:
John Guo 2022-03-21 21:17:48 +08:00
parent 87ccc27ee4
commit 6dacdd60dc
18 changed files with 481 additions and 112 deletions

View File

@ -7,6 +7,7 @@ require (
github.com/gogf/gf/contrib/drivers/pgsql/v2 v2.0.0-rc2
github.com/gogf/gf/contrib/drivers/sqlite/v2 v2.0.0-rc2
github.com/gogf/gf/v2 v2.0.0
github.com/longbridgeapp/sqlparser v0.3.1 // indirect
github.com/olekukonko/tablewriter v0.0.5
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 // indirect
)

View File

@ -22,6 +22,7 @@ github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Px
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/go-test/deep v1.0.7/go.mod h1:QV8Hv/iy04NyLBxAdO9njL0iVPN1S4d/A3NVv1V36o8=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@ -47,6 +48,8 @@ github.com/grokify/html-strip-tags-go v0.0.1/go.mod h1:2Su6romC5/1VXOQMaWL2yb618
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/lib/pq v1.10.4 h1:SO9z7FRPzA03QhHKJrH5BXA6HU1rS4V2nIVrrNC1iYk=
github.com/lib/pq v1.10.4/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/longbridgeapp/sqlparser v0.3.1 h1:iWOZWGIFgQrJRgobLXUNJdvqGRpbVXkyKUKUA5CNJBE=
github.com/longbridgeapp/sqlparser v0.3.1/go.mod h1:GIHaUq8zvYyHLCLMJJykx1CdM6LHtkUih/QaJXySSx4=
github.com/mattn/go-colorable v0.1.9 h1:sqDoxXbdeALODt0DAeJCVp38ps9ZogZEAXjus69YV3U=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=

View File

@ -215,10 +215,12 @@ type Driver interface {
}
// Link is a common database function wrapper interface.
// Note that, any operation using `Link` will have no SQL logging.
type Link interface {
QueryContext(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error)
ExecContext(ctx context.Context, sql string, args ...interface{}) (sql.Result, error)
PrepareContext(ctx context.Context, sql string) (*sql.Stmt, error)
IsOnMaster() bool
IsTransaction() bool
}

View File

@ -12,7 +12,8 @@ import (
// dbLink is used to implement interface Link for DB.
type dbLink struct {
*sql.DB
*sql.DB // Underlying DB object.
isOnMaster bool // isOnMaster marks whether current link is operated on master node.
}
// txLink is used to implement interface Link for TX.
@ -21,11 +22,22 @@ type txLink struct {
}
// IsTransaction returns if current Link is a transaction.
func (*dbLink) IsTransaction() bool {
func (l *dbLink) IsTransaction() bool {
return false
}
// IsOnMaster checks and returns whether current link is operated on master node.
func (l *dbLink) IsOnMaster() bool {
return l.isOnMaster
}
// IsTransaction returns if current Link is a transaction.
func (*txLink) IsTransaction() bool {
func (l *txLink) IsTransaction() bool {
return true
}
// IsOnMaster checks and returns whether current link is operated on master node.
// Note that, transaction operation is always operated on master node.
func (l *txLink) IsOnMaster() bool {
return true
}

View File

@ -0,0 +1,228 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gdb
import (
"context"
"strings"
"github.com/gogf/gf/v2/container/gvar"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/os/gctx"
"github.com/longbridgeapp/sqlparser"
)
// ShardingInput is input parameters for custom sharding handler.
type ShardingInput struct {
Table string // Current operation table name.
Schema string // Current operation schema, usually empty string which means uses default schema from configuration.
OperationData map[string]Value // Accurate readonly key-value data pairs from INSERT/UPDATE statement.
ConditionData map[string]Value // Accurate readonly key-value condition pairs from SELECT/UPDATE/DELETE statement.
}
// ShardingOutput is output parameters for custom sharding handler.
type ShardingOutput struct {
Table string // New table name for current operation. Use empty string for no changes of table name.
Schema string // New schema name for current operation. Use empty string for using default schema from configuration.
}
// ShardingHandler is a custom function for custom sharding table and schema for DB operation.
type ShardingHandler func(ctx context.Context, in ShardingInput) (out *ShardingOutput, err error)
const (
ctxKeyForShardingHandler gctx.StrKey = "ShardingHandler"
)
// Sharding creates and returns a new model with sharding handler.
func (m *Model) Sharding(handler ShardingHandler) *Model {
var (
ctx = m.GetCtx()
model = m.getModel()
)
model.shardingHandler = handler
// Inject sharding handler into context.
model = model.Ctx(model.injectShardingInputCaller(ctx))
return model
}
// injectShardingInputCaller injects custom sharding handler into context.
func (m *Model) injectShardingInputCaller(ctx context.Context) context.Context {
if m.shardingHandler == nil {
return ctx
}
if ctx.Value(ctxKeyForShardingHandler) != nil {
return ctx
}
return context.WithValue(ctx, ctxKeyForShardingHandler, m.shardingHandler)
}
type callShardingHandlerFromCtxInput struct {
Sql string
FormattedSql string
}
type callShardingHandlerFromCtxOutput struct {
Sql string
Table string
Schema string
}
func (c *Core) callShardingHandlerFromCtx(
ctx context.Context, in callShardingHandlerFromCtxInput,
) (out *callShardingHandlerFromCtxOutput, err error) {
var (
newSql = in.Sql
ctxValue interface{}
shardingHandler ShardingHandler
ok bool
)
if ctxValue = ctx.Value(ctxKeyForShardingHandler); ctxValue == nil {
return nil, nil
}
if shardingHandler, ok = ctxValue.(ShardingHandler); !ok {
return nil, nil
}
parsedOut, err := c.parseFormattedSql(in.FormattedSql)
if err != nil {
return nil, err
}
var shardingIn = ShardingInput{
Table: parsedOut.Table,
Schema: c.db.GetSchema(),
OperationData: parsedOut.OperationData,
ConditionData: parsedOut.ConditionData,
}
shardingOut, err := shardingHandler(ctx, shardingIn)
if err != nil {
return nil, gerror.Wrap(err, `calling sharding handler failed`)
}
if shardingOut.Table != shardingIn.Table || shardingOut.Schema != shardingIn.Schema {
if shardingOut.Table != shardingIn.Table {
newSql, err = c.formatSqlWithNewTable(in.Sql, shardingOut.Table)
if err != nil {
return nil, err
}
}
out = &callShardingHandlerFromCtxOutput{
Sql: newSql,
Table: shardingOut.Table,
Schema: shardingOut.Schema,
}
return out, nil
}
return nil, nil
}
// formatSqlWithNewTable modifies given `sql` and returns a sql with new table name `table`.
func (c *Core) formatSqlWithNewTable(sql, table string) (newSql string, err error) {
parsedStmt, err := sqlparser.NewParser(strings.NewReader(sql)).ParseStatement()
if err != nil {
return "", gerror.Wrapf(err, `parse failed for SQL: %s`, sql)
}
newTable := &sqlparser.TableName{Name: &sqlparser.Ident{Name: table}}
switch stmt := parsedStmt.(type) {
case *sqlparser.SelectStatement:
stmt.FromItems = newTable
return stmt.String(), nil
case *sqlparser.InsertStatement:
stmt.TableName = newTable
return stmt.String(), nil
case *sqlparser.UpdateStatement:
stmt.TableName = newTable
return stmt.String(), nil
case *sqlparser.DeleteStatement:
stmt.TableName = newTable
return stmt.String(), nil
default:
return "", gerror.Wrapf(err, `unsupported SQL: %s`, sql)
}
}
type parseFormattedSqlOutput struct {
Table string
OperationData map[string]Value
ConditionData map[string]Value
ParsedStmt sqlparser.Statement
}
func (c *Core) parseFormattedSql(formattedSql string) (*parseFormattedSqlOutput, error) {
var (
condition sqlparser.Expr
err error
out = &parseFormattedSqlOutput{
OperationData: make(map[string]Value),
ConditionData: make(map[string]Value),
}
)
out.ParsedStmt, err = sqlparser.NewParser(strings.NewReader(formattedSql)).ParseStatement()
if err != nil {
return nil, gerror.Wrapf(err, `parse failed for SQL: %s`, formattedSql)
}
switch stmt := out.ParsedStmt.(type) {
case *sqlparser.SelectStatement:
table, ok := stmt.FromItems.(*sqlparser.TableName)
if !ok {
return nil, gerror.Newf(
`invalid table name "%s" in SQL: %s`,
stmt.FromItems.String(), formattedSql,
)
}
out.Table = table.TableName()
condition = stmt.Condition
case *sqlparser.InsertStatement:
out.Table = stmt.TableName.TableName()
if len(stmt.Expressions) > 0 {
names := make([]string, len(stmt.ColumnNames))
for i, ident := range stmt.ColumnNames {
names[i] = ident.Name
}
// It just uses the first item.
for i, expr := range stmt.Expressions[0].Exprs {
c.injectDataByExpr(out.OperationData, names[i], expr)
}
}
case *sqlparser.UpdateStatement:
out.Table = stmt.TableName.TableName()
condition = stmt.Condition
if len(stmt.Assignments) > 0 {
for _, assignment := range stmt.Assignments {
if len(assignment.Columns) > 0 {
c.injectDataByExpr(out.OperationData, assignment.Columns[0].Name, assignment.Expr)
}
}
}
case *sqlparser.DeleteStatement:
out.Table = stmt.TableName.TableName()
condition = stmt.Condition
default:
return nil, gerror.Wrapf(err, `unsupported SQL: %s`, formattedSql)
}
err = sqlparser.Walk(sqlparser.VisitFunc(func(node sqlparser.Node) error {
if n, ok := node.(*sqlparser.BinaryExpr); ok {
if x, ok := n.X.(*sqlparser.Ident); ok {
if n.Op == sqlparser.EQ {
c.injectDataByExpr(out.ConditionData, x.Name, n.Y)
}
}
}
return nil
}), condition)
return out, err
}
func (c *Core) injectDataByExpr(data map[string]Value, name string, expr sqlparser.Expr) {
switch exprImp := expr.(type) {
case *sqlparser.StringLit:
data[name] = gvar.New(exprImp.Value)
case *sqlparser.NumberLit:
data[name] = gvar.New(exprImp.Value)
default:
data[name] = gvar.New(exprImp.String())
}
}

View File

@ -129,6 +129,7 @@ func (c *Core) DoFilter(ctx context.Context, link Link, sql string, args []inter
// DoCommit commits current sql and arguments to underlying sql driver.
func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutput, err error) {
var (
sqlTx *sql.Tx
sqlStmt *sql.Stmt
@ -137,9 +138,31 @@ func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutp
stmtSqlRows *sql.Rows
stmtSqlRow *sql.Row
rowsAffected int64
shardingOut *callShardingHandlerFromCtxOutput
cancelFuncForTimeout context.CancelFunc
formattedSql = FormatSqlWithArgs(in.Sql, in.Args)
timestampMilli1 = gtime.TimestampMilli()
)
shardingOut, err = c.callShardingHandlerFromCtx(ctx, callShardingHandlerFromCtxInput{
Sql: in.Sql,
FormattedSql: formattedSql,
})
if err != nil {
return
}
// Sharding handling.
if shardingOut != nil {
if shardingOut.Sql != "" {
in.Sql = shardingOut.Sql
}
// If schema changes, it here creates and uses a new DB link operation object.
if shardingOut.Schema != c.db.GetSchema() {
in.Link, err = c.db.GetCore().GetLink(ctx, in.Link.IsOnMaster(), shardingOut.Schema)
if err != nil {
return
}
}
}
// Trace span start.
tr := otel.GetTracerProvider().Tracer(traceInstrumentName, trace.WithInstrumentationVersion(gf.VERSION))
@ -232,7 +255,7 @@ func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutp
Sql: in.Sql,
Type: in.Type,
Args: in.Args,
Format: FormatSqlWithArgs(in.Sql, in.Args),
Format: formattedSql,
Error: err,
Start: timestampMilli1,
End: timestampMilli2,

View File

@ -41,6 +41,27 @@ func DBFromCtx(ctx context.Context) DB {
return nil
}
// GetLink creates and returns the underlying database link object with transaction checks.
// The parameter `master` specifies whether using the master node if master-slave configured.
func (c *Core) GetLink(ctx context.Context, master bool, schema string) (Link, error) {
tx := TXFromCtx(ctx, c.db.GetGroup())
if tx != nil {
return &txLink{tx.tx}, nil
}
if master {
link, err := c.db.GetCore().MasterLink(schema)
if err != nil {
return nil, err
}
return link, nil
}
link, err := c.db.GetCore().SlaveLink(schema)
if err != nil {
return nil, err
}
return link, nil
}
// MasterLink acts like function Master but with additional `schema` parameter specifying
// the schema for the connection. It is defined for internal usage.
// Also see Master.
@ -49,7 +70,10 @@ func (c *Core) MasterLink(schema ...string) (Link, error) {
if err != nil {
return nil, err
}
return &dbLink{db}, nil
return &dbLink{
DB: db,
isOnMaster: true,
}, nil
}
// SlaveLink acts like function Slave but with additional `schema` parameter specifying
@ -60,7 +84,10 @@ func (c *Core) SlaveLink(schema ...string) (Link, error) {
if err != nil {
return nil, err
}
return &dbLink{db}, nil
return &dbLink{
DB: db,
isOnMaster: false,
}, nil
}
// QuoteWord checks given string `s` a word,

View File

@ -266,6 +266,7 @@ func (m *Model) Clone() *Model {
} else {
newModel = m.db.Model(m.tablesInit)
}
// Basic attributes copy.
*newModel = *m
// Shallow copy slice attributes.
if n := len(m.extraArgs); n > 0 {

View File

@ -37,8 +37,8 @@ func (m *Model) Delete(where ...interface{}) (result sql.Result, err error) {
in := &HookUpdateInput{
internalParamHookUpdate: internalParamHookUpdate{
internalParamHook: internalParamHook{
db: m.db,
link: m.getLink(true),
link: m.getLink(true),
model: m,
},
handler: m.hookHandler.Update,
},
@ -60,8 +60,8 @@ func (m *Model) Delete(where ...interface{}) (result sql.Result, err error) {
in := &HookDeleteInput{
internalParamHookDelete: internalParamHookDelete{
internalParamHook: internalParamHook{
db: m.db,
link: m.getLink(true),
link: m.getLink(true),
model: m,
},
handler: m.hookHandler.Delete,
},

View File

@ -31,10 +31,10 @@ type HookHandler struct {
// internalParamHook manages all internal parameters for hook operations.
// The `internal` obviously means you cannot access these parameters outside this package.
type internalParamHook struct {
db DB // Underlying DB object.
link Link // Connection object from third party sql driver.
handlerCalled bool // Simple mark for custom handler called, in case of recursive calling.
removedWhere bool // Removed mark for condition string that was removed `WHERE` prefix.
link Link // Connection object from third party sql driver.
model *Model // Underlying Model object.
handlerCalled bool // Simple mark for custom handler called, in case of recursive calling.
removedWhere bool // Removed mark for condition string that was removed `WHERE` prefix.
}
type internalParamHookSelect struct {
@ -90,13 +90,22 @@ type HookDeleteInput struct {
Args []interface{}
}
const (
whereKeyInCondition = " WHERE "
)
// IsTransaction checks and returns whether current operation is during transaction.
func (h *internalParamHook) IsTransaction() bool {
return h.link.IsTransaction()
}
// Next calls the next hook handler.
func (h *HookSelectInput) Next(ctx context.Context) (result Result, err error) {
if h.handler != nil && !h.handlerCalled {
h.handlerCalled = true
return h.handler(ctx, h)
}
return h.db.DoSelect(ctx, h.link, h.Sql, h.Args...)
return h.model.db.DoSelect(ctx, h.link, h.Sql, h.Args...)
}
// Next calls the next hook handler.
@ -105,39 +114,39 @@ func (h *HookInsertInput) Next(ctx context.Context) (result sql.Result, err erro
h.handlerCalled = true
return h.handler(ctx, h)
}
return h.db.DoInsert(ctx, h.link, h.Table, h.Data, h.Option)
return h.model.db.DoInsert(ctx, h.link, h.Table, h.Data, h.Option)
}
// Next calls the next hook handler.
func (h *HookUpdateInput) Next(ctx context.Context) (result sql.Result, err error) {
if h.handler != nil && !h.handlerCalled {
h.handlerCalled = true
if gstr.HasPrefix(h.Condition, " WHERE ") {
if gstr.HasPrefix(h.Condition, whereKeyInCondition) {
h.removedWhere = true
h.Condition = gstr.TrimLeftStr(h.Condition, " WHERE ")
h.Condition = gstr.TrimLeftStr(h.Condition, whereKeyInCondition)
}
return h.handler(ctx, h)
}
if h.removedWhere {
h.Condition = " WHERE " + h.Condition
h.Condition = whereKeyInCondition + h.Condition
}
return h.db.DoUpdate(ctx, h.link, h.Table, h.Data, h.Condition, h.Args...)
return h.model.db.DoUpdate(ctx, h.link, h.Table, h.Data, h.Condition, h.Args...)
}
// Next calls the next hook handler.
func (h *HookDeleteInput) Next(ctx context.Context) (result sql.Result, err error) {
if h.handler != nil && !h.handlerCalled {
h.handlerCalled = true
if gstr.HasPrefix(h.Condition, " WHERE ") {
if gstr.HasPrefix(h.Condition, whereKeyInCondition) {
h.removedWhere = true
h.Condition = gstr.TrimLeftStr(h.Condition, " WHERE ")
h.Condition = gstr.TrimLeftStr(h.Condition, whereKeyInCondition)
}
return h.handler(ctx, h)
}
if h.removedWhere {
h.Condition = " WHERE " + h.Condition
h.Condition = whereKeyInCondition + h.Condition
}
return h.db.DoDelete(ctx, h.link, h.Table, h.Condition, h.Args...)
return h.model.db.DoDelete(ctx, h.link, h.Table, h.Condition, h.Args...)
}
// Hook sets the hook functions for current model.

View File

@ -314,8 +314,8 @@ func (m *Model) doInsertWithOption(insertOption int) (result sql.Result, err err
in := &HookInsertInput{
internalParamHookInsert: internalParamHookInsert{
internalParamHook: internalParamHook{
db: m.db,
link: m.getLink(true),
link: m.getLink(true),
model: m,
},
handler: m.hookHandler.Insert,
},

View File

@ -536,8 +536,8 @@ func (m *Model) doGetAllBySql(sql string, args ...interface{}) (result Result, e
in := &HookSelectInput{
internalParamHookSelect: internalParamHookSelect{
internalParamHook: internalParamHook{
db: m.db,
link: m.getLink(false),
link: m.getLink(false),
model: m,
},
handler: m.hookHandler.Select,
},

View File

@ -1,80 +0,0 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gdb
import (
"context"
"reflect"
"github.com/gogf/gf/v2/container/gvar"
"github.com/gogf/gf/v2/errors/gerror"
)
// ShardingInput is input parameters for custom sharding handler.
type ShardingInput struct {
Table string // Current operation table name.
Schema string // Current operation schema, usually empty string which means uses default schema from configuration.
Data map[string]Value // Accurate key-value pairs from SELECT/INSERT/UPDATE/DELETE statement.
}
// ShardingOutput is output parameters for custom sharding handler.
type ShardingOutput struct {
Table string // New table name for current operation. Use empty string for no changes of table name.
Schema string // New schema name for current operation. Use empty string for using default schema from configuration.
}
type ShardingHandler func(ctx context.Context, in ShardingInput) (out *ShardingOutput, err error)
type callShardingHandlerInput struct {
Table string
InsertData List
UpdateData interface{}
Condition string
Sql string
}
func (m *Model) callShardingHandler(ctx context.Context, in callShardingHandlerInput) (out *ShardingOutput, err error) {
if m.shardingHandler == nil {
return &ShardingOutput{}, nil
}
return
}
func (m *Model) shardingDataFromInsertData(data List) (shardingData map[string]Value, err error) {
if len(data) == 0 {
return nil, nil
}
shardingData = make(map[string]Value)
// If given batch data(in batch insert scenario), it uses the first data.
for k, v := range data[0] {
shardingData[k] = gvar.New(v)
}
return shardingData, nil
}
func (m *Model) shardingDataFromUpdateData(data interface{}) (shardingData map[string]Value, err error) {
shardingData = make(map[string]Value)
switch value := data.(type) {
case map[string]interface{}:
for k, v := range value {
shardingData[k] = gvar.New(v)
}
case string:
default:
return nil, gerror.Newf(`unsupported data of type "%s" for sharding`, reflect.TypeOf(data))
}
return
}
func (m *Model) shardingDataFromSql(sql string, args []interface{}) (shardingData map[string]Value, err error) {
return
}
func (m *Model) shardingDataFromCondition(condition string) (shardingData map[string]Value, err error) {
return
}

View File

@ -79,8 +79,8 @@ func (m *Model) Update(dataAndWhere ...interface{}) (result sql.Result, err erro
in := &HookUpdateInput{
internalParamHookUpdate: internalParamHookUpdate{
internalParamHook: internalParamHook{
db: m.db,
link: m.getLink(true),
link: m.getLink(true),
model: m,
},
handler: m.hookHandler.Update,
},

View File

@ -143,7 +143,7 @@ func (m *Model) doWithScanStruct(pointer interface{}) error {
}
// Recursively with feature checks.
model = m.db.With(field.Value).Hook(m.hook)
model = m.db.With(field.Value).Hook(m.hookHandler)
if m.withAll {
model = model.WithAll()
} else {
@ -258,7 +258,7 @@ func (m *Model) doWithScanStructs(pointer interface{}) error {
fieldKeys = structType.FieldKeys()
}
// Recursively with feature checks.
model = m.db.With(field.Value).Hook(m.hook)
model = m.db.With(field.Value).Hook(m.hookHandler)
if m.withAll {
model = model.WithAll()
} else {

View File

@ -0,0 +1,138 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gdb_test
import (
"context"
"testing"
"github.com/gogf/gf/v2/database/gdb"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/test/gtest"
)
func Test_Model_Sharding(t *testing.T) {
table1 := createTable()
table2 := createTable()
defer dropTable(table1)
defer dropTable(table2)
gtest.C(t, func(t *gtest.T) {
_, err1 := db.Model(table1).Data(g.Map{
"id": 1,
}).Insert()
t.AssertNil(err1)
_, err2 := db.Model(table2).Data(g.Map{
"id": 2,
}).Insert()
t.AssertNil(err2)
})
// no sharding.
gtest.C(t, func(t *gtest.T) {
all, err := db.Model(table1).All()
t.AssertNil(err)
t.Assert(len(all), 1)
t.Assert(all[0]["id"].String(), 1)
})
// with sharding handler.
gtest.C(t, func(t *gtest.T) {
all, err := db.Model(table1).Sharding(func(ctx context.Context, in gdb.ShardingInput) (out *gdb.ShardingOutput, err error) {
out = &gdb.ShardingOutput{
Table: table2,
}
return
}).All()
t.AssertNil(err)
t.Assert(len(all), 1)
t.Assert(all[0]["id"].String(), 2)
})
// with sharding handler and no existence table name.
gtest.C(t, func(t *gtest.T) {
all, err := db.Model("none").Sharding(func(ctx context.Context, in gdb.ShardingInput) (out *gdb.ShardingOutput, err error) {
out = &gdb.ShardingOutput{
Table: table2,
}
return
}).All()
t.AssertNil(err)
t.Assert(len(all), 1)
t.Assert(all[0]["id"].String(), 2)
})
// with sharding handler and no existence table name and tables fields retrieving.
gtest.C(t, func(t *gtest.T) {
type User struct {
Id int
Passport string
Password string
NickName string
}
var users []User
err := db.Model("none").Sharding(func(ctx context.Context, in gdb.ShardingInput) (out *gdb.ShardingOutput, err error) {
out = &gdb.ShardingOutput{
Table: table2,
}
return
}).Scan(&users)
t.AssertNil(err)
t.Assert(len(users), 1)
t.Assert(users[0].Id, 2)
})
}
func Test_Model_Sharding_Schema(t *testing.T) {
var (
db1 = db
db2 = db.Schema(TestSchema2)
table1 = createTableWithDb(db1)
table2 = createTableWithDb(db2)
)
defer dropTableWithDb(db1, table1)
defer dropTableWithDb(db2, table2)
gtest.C(t, func(t *gtest.T) {
_, err1 := db1.Model(table1).Data(g.Map{
"id": 1,
}).Insert()
t.AssertNil(err1)
_, err2 := db2.Model(table2).Data(g.Map{
"id": 2,
}).Insert()
t.AssertNil(err2)
})
// no sharding.
gtest.C(t, func(t *gtest.T) {
all, err := db1.Model(table1).All()
t.AssertNil(err)
t.Assert(len(all), 1)
t.Assert(all[0]["id"].String(), 1)
})
gtest.C(t, func(t *gtest.T) {
_, err := db1.Model(table2).All()
// Table not exist error.
t.AssertNE(err, nil)
})
gtest.C(t, func(t *gtest.T) {
all, err := db2.Model(table2).All()
t.AssertNil(err)
t.Assert(len(all), 1)
t.Assert(all[0]["id"].String(), 2)
})
// with sharding handler and no existence table name and schema change.
gtest.C(t, func(t *gtest.T) {
all, err := db1.Model("none").Sharding(func(ctx context.Context, in gdb.ShardingInput) (out *gdb.ShardingOutput, err error) {
out = &gdb.ShardingOutput{
Table: table2,
Schema: TestSchema2,
}
return
}).All()
t.AssertNil(err)
t.Assert(len(all), 1)
t.Assert(all[0]["id"].String(), 2)
})
}

1
go.mod
View File

@ -11,6 +11,7 @@ require (
github.com/go-sql-driver/mysql v1.6.0
github.com/gorilla/websocket v1.5.0
github.com/grokify/html-strip-tags-go v0.0.1
github.com/longbridgeapp/sqlparser v0.3.1
github.com/olekukonko/tablewriter v0.0.5
go.opentelemetry.io/otel v1.0.0
go.opentelemetry.io/otel/sdk v1.0.0

4
go.sum
View File

@ -20,6 +20,8 @@ github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Px
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/go-test/deep v1.0.7 h1:/VSMRlnY/JSyqxQUzQLKVMAskpY/NZKFA5j2P+0pP2M=
github.com/go-test/deep v1.0.7/go.mod h1:QV8Hv/iy04NyLBxAdO9njL0iVPN1S4d/A3NVv1V36o8=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
@ -41,6 +43,8 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/grokify/html-strip-tags-go v0.0.1 h1:0fThFwLbW7P/kOiTBs03FsJSV9RM2M/Q/MOnCQxKMo0=
github.com/grokify/html-strip-tags-go v0.0.1/go.mod h1:2Su6romC5/1VXOQMaWL2yb618ARB8iVo6/DR99A6d78=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/longbridgeapp/sqlparser v0.3.1 h1:iWOZWGIFgQrJRgobLXUNJdvqGRpbVXkyKUKUA5CNJBE=
github.com/longbridgeapp/sqlparser v0.3.1/go.mod h1:GIHaUq8zvYyHLCLMJJykx1CdM6LHtkUih/QaJXySSx4=
github.com/mattn/go-colorable v0.1.9 h1:sqDoxXbdeALODt0DAeJCVp38ps9ZogZEAXjus69YV3U=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=