1
0
mirror of https://github.com/gogf/gf.git synced 2025-04-05 03:05:05 +08:00

feat(database/gdb): add transaction propagation&isolation level&readonly features (#4013)

This commit is contained in:
John Guo 2024-12-07 14:01:31 +08:00 committed by GitHub
parent b8142bf1fc
commit 2066aa4803
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1527 additions and 515 deletions

View File

@ -10,6 +10,7 @@ import (
"context"
"fmt"
"testing"
"time"
"github.com/gogf/gf/v2/container/garray"
"github.com/gogf/gf/v2/database/gdb"
@ -41,7 +42,8 @@ var (
func init() {
nodeDefault := gdb.ConfigNode{
Link: fmt.Sprintf("mysql:root:%s@tcp(127.0.0.1:3306)/?loc=Local&parseTime=true", TestDbPass),
ExecTimeout: time.Second * 2,
Link: fmt.Sprintf("mysql:root:%s@tcp(127.0.0.1:3306)/?loc=Local&parseTime=true", TestDbPass),
}
partitionDefault := gdb.ConfigNode{
Link: fmt.Sprintf("mysql:root:%s@tcp(127.0.0.1:3307)/?loc=Local&parseTime=true", TestDbPass),

View File

@ -8,6 +8,7 @@ package mysql_test
import (
"context"
"database/sql"
"fmt"
"testing"
@ -807,12 +808,12 @@ func Test_Transaction_Nested_TX_Transaction_UseTX(t *testing.T) {
)
err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
// commit
err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
_, err = tx.Model(table).Data(g.Map{
err = tx.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
err = tx2.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
err = tx2.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
err = tx2.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
err = tx2.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
_, err = tx2.Model(table).Data(g.Map{
"id": 1,
"passport": "USER_1",
"password": "PASS_1",
@ -842,8 +843,8 @@ func Test_Transaction_Nested_TX_Transaction_UseTX(t *testing.T) {
t.AssertNil(err)
// rollback
err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
_, err = tx.Model(table).Data(g.Map{
err = tx.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
_, err = tx2.Model(table).Data(g.Map{
"id": 2,
"passport": "USER_2",
"password": "PASS_2",
@ -869,12 +870,12 @@ func Test_Transaction_Nested_TX_Transaction_UseTX(t *testing.T) {
// another record.
err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
// commit
err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
_, err = tx.Model(table).Data(g.Map{
err = tx.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
err = tx2.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
err = tx2.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
err = tx2.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
err = tx2.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
_, err = tx2.Model(table).Data(g.Map{
"id": 3,
"passport": "USER_1",
"password": "PASS_1",
@ -904,8 +905,8 @@ func Test_Transaction_Nested_TX_Transaction_UseTX(t *testing.T) {
t.AssertNil(err)
// rollback
err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
_, err = tx.Model(table).Data(g.Map{
err = tx.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
_, err = tx2.Model(table).Data(g.Map{
"id": 4,
"passport": "USER_2",
"password": "PASS_2",
@ -945,11 +946,11 @@ func Test_Transaction_Nested_TX_Transaction_UseDB(t *testing.T) {
)
err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
// commit
err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
_, err = db.Model(table).Ctx(ctx).Data(g.Map{
"id": 1,
"passport": "USER_1",
@ -980,8 +981,8 @@ func Test_Transaction_Nested_TX_Transaction_UseDB(t *testing.T) {
t.AssertNil(err)
// rollback
err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
_, err = tx.Model(table).Ctx(ctx).Data(g.Map{
err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
_, err = tx2.Model(table).Ctx(ctx).Data(g.Map{
"id": 2,
"passport": "USER_2",
"password": "PASS_2",
@ -1007,11 +1008,11 @@ func Test_Transaction_Nested_TX_Transaction_UseDB(t *testing.T) {
err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
// commit
err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
_, err = db.Model(table).Ctx(ctx).Data(g.Map{
"id": 3,
"passport": "USER_1",
@ -1042,8 +1043,8 @@ func Test_Transaction_Nested_TX_Transaction_UseDB(t *testing.T) {
t.AssertNil(err)
// rollback
err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
_, err = tx.Model(table).Ctx(ctx).Data(g.Map{
err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error {
_, err = tx2.Model(table).Ctx(ctx).Data(g.Map{
"id": 4,
"passport": "USER_2",
"password": "PASS_2",
@ -1143,3 +1144,567 @@ func Test_Transaction_Method(t *testing.T) {
t.Assert(count, int64(0))
})
}
func Test_Transaction_Propagation(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
table := createTable()
defer dropTable(table)
// Test PropagationRequired
err := db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
// Insert initial record
_, err := tx.Insert(table, g.Map{
"id": 1,
"passport": "required",
})
t.AssertNil(err)
// Nested transaction with PropagationRequired
err = tx.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationRequired,
}, func(ctx context.Context, tx2 gdb.TX) error {
// Should use the same transaction
_, err := tx2.Insert(table, g.Map{
"id": 2,
"passport": "required_nested",
})
return err
})
t.AssertNil(err)
return nil
})
t.AssertNil(err)
// Verify both records exist
count, err := db.Model(table).Count()
t.AssertNil(err)
t.Assert(count, int64(2))
})
gtest.C(t, func(t *gtest.T) {
table := createTable()
defer dropTable(table)
// Test PropagationRequiresNew
err := db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
// Insert in outer transaction
_, err := tx.Insert(table, g.Map{
"id": 3,
"passport": "outer",
})
t.AssertNil(err)
// Inner transaction with PropagationRequiresNew
err = tx.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationRequiresNew,
}, func(ctx context.Context, tx2 gdb.TX) error {
// This is a new transaction
_, _ = tx2.Insert(table, g.Map{
"id": 4,
"passport": "inner_new",
})
// Simulate error to test independent rollback
return gerror.New("rollback inner transaction")
})
// Inner transaction error should not affect outer transaction
t.AssertNE(err, nil)
return nil
})
t.AssertNil(err)
// Verify only outer transaction record exists
count, err := db.Model(table).Where("passport", "outer").Count()
t.AssertNil(err)
t.Assert(count, int64(1))
})
gtest.C(t, func(t *gtest.T) {
table := createTable()
defer dropTable(table)
// Test PropagationNested
err := db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
// Insert in outer transaction
_, err := tx.Insert(table, g.Map{
"id": 5,
"passport": "nested_outer",
})
t.AssertNil(err)
// Nested transaction
err = tx.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationNested,
}, func(ctx context.Context, tx2 gdb.TX) error {
_, _ = tx2.Insert(table, g.Map{
"id": 6,
"passport": "nested_inner",
})
// Simulate error to test savepoint rollback
return gerror.New("rollback to savepoint")
})
t.AssertNE(err, nil)
// Insert another record after nested transaction rollback
_, err = tx.Insert(table, g.Map{
"id": 7,
"passport": "nested_after",
})
t.AssertNil(err)
return nil
})
t.AssertNil(err)
// Verify outer transaction records exist, but nested transaction record doesn't
count, err := db.Model(table).Where("passport", "nested_inner").Count()
t.AssertNil(err)
t.Assert(count, int64(0))
count, err = db.Model(table).Where("passport IN(?,?)",
"nested_outer", "nested_after").Count()
t.AssertNil(err)
t.Assert(count, int64(2))
})
gtest.C(t, func(t *gtest.T) {
table := createTable()
defer dropTable(table)
// Test PropagationNotSupported
err := db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
// Insert in transaction
_, err := tx.Insert(table, g.Map{
"id": 8,
"passport": "tx_record",
})
t.AssertNil(err)
// Non-transactional operation
err = tx.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationNotSupported,
}, func(ctx context.Context, tx2 gdb.TX) error {
// Should execute without transaction
t.Assert(tx2, nil)
_, err := db.Insert(ctx, table, g.Map{
"id": 9,
"passport": "non_tx_record",
})
return err
})
t.AssertNil(err)
return gerror.New("rollback outer transaction")
})
t.AssertNE(err, nil)
// Verify transactional record is rolled back but non-transactional record exists
count, err := db.Model(table).Where("passport", "tx_record").Count()
t.AssertNil(err)
t.Assert(count, int64(0))
count, err = db.Model(table).Where("passport", "non_tx_record").Count()
t.AssertNil(err)
t.Assert(count, int64(1))
})
gtest.C(t, func(t *gtest.T) {
table := createTable()
defer dropTable(table)
// Test PropagationMandatory
err := db.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationMandatory,
}, func(ctx context.Context, tx gdb.TX) error {
return nil
})
// Should fail because no transaction exists
t.AssertNE(err, nil)
// Test within an existing transaction
err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
return tx.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationMandatory,
}, func(ctx context.Context, tx2 gdb.TX) error {
// Should succeed because transaction exists
_, err := tx2.Insert(table, g.Map{
"id": 10,
"passport": "mandatory",
})
return err
})
})
t.AssertNil(err)
})
gtest.C(t, func(t *gtest.T) {
table := createTable()
defer dropTable(table)
// Test PropagationNever
err := db.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationNever,
}, func(ctx context.Context, tx gdb.TX) error {
// Should execute without transaction
t.Assert(tx, nil)
_, err := db.Insert(ctx, table, g.Map{
"id": 11,
"passport": "never",
})
return err
})
t.AssertNil(err)
// Test within an existing transaction
err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
return tx.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationNever,
}, func(ctx context.Context, tx2 gdb.TX) error {
return nil
})
})
// Should fail because transaction exists
t.AssertNE(err, nil)
})
}
func Test_Transaction_Propagation_Complex(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
table1 := createTable()
table2 := createTable()
defer dropTable(table1)
defer dropTable(table2)
// Test nested transactions with different propagation behaviors
err := db.Transaction(ctx, func(ctx context.Context, tx1 gdb.TX) error {
// Insert in outer transaction
_, err := tx1.Insert(table1, g.Map{
"id": 1,
"passport": "outer",
})
t.AssertNil(err)
// First nested transaction (NESTED)
err = tx1.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationNested,
}, func(ctx context.Context, tx2 gdb.TX) error {
_, err := tx2.Insert(table1, g.Map{
"id": 2,
"passport": "nested1",
})
t.AssertNil(err)
// Second nested transaction (REQUIRES_NEW)
err = tx2.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationRequiresNew,
}, func(ctx context.Context, tx3 gdb.TX) error {
_, _ = tx3.Insert(table1, g.Map{
"id": 3,
"passport": "new1",
})
// This will be rolled back independently
return gerror.New("rollback new transaction")
})
t.AssertNE(err, nil)
// Third nested transaction (NESTED)
return tx2.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationNested,
}, func(ctx context.Context, tx3 gdb.TX) error {
_, _ = tx3.Insert(table1, g.Map{
"id": 4,
"passport": "nested2",
})
// This will rollback to the savepoint
return gerror.New("rollback nested transaction")
})
})
t.AssertNE(err, nil)
// Fourth transaction (NOT_SUPPORTED)
// Note that, it locks table if it continues using table1.
err = tx1.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationNotSupported,
}, func(ctx context.Context, tx2 gdb.TX) error {
// Should execute without transaction
t.Assert(tx2, nil)
_, err := db.Insert(ctx, table2, g.Map{
"id": 5,
"passport": "not_supported",
})
return err
})
t.AssertNil(err)
return nil
})
t.AssertNil(err)
// Verify final state
// 1. "outer" should exist (committed)
count, err := db.Model(table1).Where("passport", "outer").Count()
t.AssertNil(err)
t.Assert(count, int64(1))
// 2. "nested1" should not exist (rolled back due to error)
count, err = db.Model(table1).Where("passport", "nested1").Count()
t.AssertNil(err)
t.Assert(count, int64(0))
// 3. "new1" should not exist (rolled back independently)
count, err = db.Model(table1).Where("passport", "new1").Count()
t.AssertNil(err)
t.Assert(count, int64(0))
// 4. "nested2" should not exist (rolled back to savepoint)
count, err = db.Model(table1).Where("passport", "nested2").Count()
t.AssertNil(err)
t.Assert(count, int64(0))
// 5. "not_supported" should exist (non-transactional)
count, err = db.Model(table2).Where("passport", "not_supported").Count()
t.AssertNil(err)
t.Assert(count, int64(1))
})
gtest.C(t, func(t *gtest.T) {
table := createTable()
defer dropTable(table)
// Test transaction suspension and resume
err := db.Transaction(ctx, func(ctx context.Context, tx1 gdb.TX) error {
// Insert in outer transaction
_, err := tx1.Insert(table, g.Map{
"id": 6,
"passport": "suspend_outer",
"password": "pass6",
"nickname": "suspend_outer",
"create_time": gtime.Now().String(),
})
t.AssertNil(err)
// Suspend current transaction (NOT_SUPPORTED)
err = tx1.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationNotSupported,
}, func(ctx context.Context, tx2 gdb.TX) error {
// Should execute without transaction
t.Assert(tx2, nil)
// Start a new independent transaction
return db.Transaction(ctx, func(ctx context.Context, tx3 gdb.TX) error {
_, err := tx3.Insert(table, g.Map{
"id": 7,
"passport": "independent",
"password": "pass7",
"nickname": "independent",
"create_time": gtime.Now().String(),
})
return err
})
})
t.AssertNil(err)
// Resume original transaction
_, err = tx1.Insert(table, g.Map{
"id": 8,
"passport": "suspend_resume",
"password": "pass8",
"nickname": "suspend_resume",
"create_time": gtime.Now().String(),
})
t.AssertNil(err)
// Simulate error to rollback outer transaction
return gerror.New("rollback outer transaction")
})
t.AssertNE(err, nil)
// Verify final state
// 1. "suspend_outer" and "suspend_resume" should not exist (rolled back)
count, err := db.Model(table).Where("passport IN(?,?)",
"suspend_outer", "suspend_resume").Count()
t.AssertNil(err)
t.Assert(count, int64(0))
// 2. "independent" should exist (committed independently)
count, err = db.Model(table).Where("passport", "independent").Count()
t.AssertNil(err)
t.Assert(count, int64(1))
})
}
func Test_Transaction_ReadOnly(t *testing.T) {
table := createInitTable()
defer dropTable(table)
gtest.C(t, func(t *gtest.T) {
// Test read-only transaction
err := db.TransactionWithOptions(ctx, gdb.TxOptions{
ReadOnly: true,
}, func(ctx context.Context, tx gdb.TX) error {
// Try to modify data in read-only transaction
_, err := tx.Update(table, g.Map{"passport": "changed"}, "id=1")
// Should return error
return err
})
t.AssertNE(err, nil)
// Verify data was not modified
v, err := db.Model(table).Where("id=1").Value("passport")
t.AssertNil(err)
t.Assert(v.String(), "user_1")
})
}
func Test_Transaction_Isolation(t *testing.T) {
// Test READ UNCOMMITTED
gtest.C(t, func(t *gtest.T) {
table := createInitTable()
defer dropTable(table)
err := db.TransactionWithOptions(ctx, gdb.TxOptions{
Isolation: sql.LevelReadUncommitted,
}, func(ctx context.Context, tx1 gdb.TX) error {
// Update value in first transaction
_, err := tx1.Update(table, g.Map{"passport": "dirty_read"}, "id=1")
t.AssertNil(err)
// Start another transaction to verify dirty read
err = db.TransactionWithOptions(ctx, gdb.TxOptions{
Isolation: sql.LevelReadUncommitted,
}, func(ctx context.Context, tx2 gdb.TX) error {
// Should see uncommitted change in READ UNCOMMITTED
v, err := tx2.Model(table).Where("id=1").Value("passport")
t.AssertNil(err)
t.Assert(v.String(), "dirty_read")
return nil
})
t.AssertNil(err)
// Rollback the first transaction
return gerror.New("rollback first transaction")
})
t.AssertNE(err, nil)
// Verify the value is rolled back
v, err := db.Model(table).Where("id=1").Value("passport")
t.AssertNil(err)
t.Assert(v.String(), "user_1")
})
// Test REPEATABLE READ (default)
gtest.C(t, func(t *gtest.T) {
table := createInitTable()
defer dropTable(table)
// Start a transaction with REPEATABLE READ isolation
err := db.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationRequiresNew,
Isolation: sql.LevelRepeatableRead,
}, func(ctx context.Context, tx1 gdb.TX) error {
// First read
v1, err := tx1.Model(table).Where("id=1").Value("passport")
t.AssertNil(err)
initialValue := v1.String()
// Another transaction updates and commits the value
err = db.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationRequiresNew,
}, func(ctx context.Context, tx2 gdb.TX) error {
_, err := tx2.Update(table, g.Map{
"passport": "changed_value",
}, "id=1")
t.AssertNil(err)
return nil
})
t.AssertNil(err)
// Verify the change is visible outside transaction
v, err := db.Model(table).Where("id=1").Value("passport")
t.AssertNil(err)
t.Assert(v.String(), "changed_value")
// Should still see old value in REPEATABLE READ transaction
v2, err := tx1.Model(table).Where("id=1").Value("passport")
t.AssertNil(err)
t.Assert(v2.String(), initialValue)
// Even after multiple reads, should still see the same value
v3, err := tx1.Model(table).Where("id=1").Value("passport")
t.AssertNil(err)
t.Assert(v3.String(), initialValue)
return nil
})
t.AssertNil(err)
// After transaction ends, should see the committed change
v, err := db.Model(table).Where("id=1").Value("passport")
t.AssertNil(err)
t.Assert(v.String(), "changed_value")
})
// Test SERIALIZABLE
gtest.C(t, func(t *gtest.T) {
table := createInitTable()
defer dropTable(table)
err := db.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationRequiresNew,
Isolation: sql.LevelSerializable,
}, func(ctx context.Context, tx1 gdb.TX) error {
// Read all records
_, err := tx1.Model(table).All()
t.AssertNil(err)
// Try concurrent insert in another transaction
err = db.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationRequiresNew,
Isolation: sql.LevelSerializable,
}, func(ctx context.Context, tx2 gdb.TX) error {
_, err := tx2.Insert(table, g.Map{
"id": 1000,
"passport": "new_user",
})
return err
})
t.AssertNE(err, nil)
return nil
})
t.AssertNil(err)
})
// Test READ COMMITTED
gtest.C(t, func(t *gtest.T) {
table := createInitTable()
defer dropTable(table)
err := db.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationRequiresNew,
Isolation: sql.LevelReadCommitted,
}, func(ctx context.Context, tx1 gdb.TX) error {
// First read
v1, err := tx1.Model(table).Where("id=1").Value("passport")
t.AssertNil(err)
initialValue := v1.String()
// Another transaction updates and commits
err = db.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationRequiresNew,
Isolation: sql.LevelReadCommitted,
}, func(ctx context.Context, tx2 gdb.TX) error {
_, err := tx2.Update(table, g.Map{"passport": "committed_value"}, "id=1")
return err
})
t.AssertNil(err)
// Should see new value in READ COMMITTED
v2, err := tx1.Model(table).Where("id=1").Value("passport")
t.AssertNil(err)
t.Assert(v2.String(), "committed_value")
t.AssertNE(v2.String(), initialValue)
return nil
})
t.AssertNil(err)
})
}

View File

@ -48,7 +48,7 @@ type DB interface {
// Raw creates and returns a model based on a raw sql not a table.
Raw(rawSql string, args ...interface{}) *Model
// Schema creates and returns a schema.
// Schema switches to a specified schema.
// Also see Core.Schema.
Schema(schema string) *Schema
@ -58,7 +58,6 @@ type DB interface {
// Open creates a raw connection object for database with given node configuration.
// Note that it is not recommended using the function manually.
// Also see DriverMysql.Open.
Open(config *ConfigNode) (*sql.DB, error)
// Ctx is a chaining function, which creates and returns a new DB that is a shallow copy
@ -78,173 +77,422 @@ type DB interface {
// Query APIs.
// ===========================================================================
Query(ctx context.Context, sql string, args ...interface{}) (Result, error) // See Core.Query.
Exec(ctx context.Context, sql string, args ...interface{}) (sql.Result, error) // See Core.Exec.
Prepare(ctx context.Context, sql string, execOnMaster ...bool) (*Stmt, error) // See Core.Prepare.
// Query executes a SQL query that returns rows using given SQL and arguments.
// The args are for any placeholder parameters in the query.
Query(ctx context.Context, sql string, args ...interface{}) (Result, error)
// Exec executes a SQL query that doesn't return rows (e.g., INSERT, UPDATE, DELETE).
// It returns sql.Result for accessing LastInsertId or RowsAffected.
Exec(ctx context.Context, sql string, args ...interface{}) (sql.Result, error)
// Prepare creates a prepared statement for later queries or executions.
// The execOnMaster parameter determines whether the statement executes on master node.
Prepare(ctx context.Context, sql string, execOnMaster ...bool) (*Stmt, error)
// ===========================================================================
// Common APIs for CURD.
// ===========================================================================
Insert(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) // See Core.Insert.
InsertIgnore(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) // See Core.InsertIgnore.
InsertAndGetId(ctx context.Context, table string, data interface{}, batch ...int) (int64, error) // See Core.InsertAndGetId.
Replace(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) // See Core.Replace.
Save(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) // See Core.Save.
Update(ctx context.Context, table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error) // See Core.Update.
Delete(ctx context.Context, table string, condition interface{}, args ...interface{}) (sql.Result, error) // See Core.Delete.
// Insert inserts one or multiple records into table.
// The data can be a map, struct, or slice of maps/structs.
// The optional batch parameter specifies the batch size for bulk inserts.
Insert(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error)
// InsertIgnore inserts records but ignores duplicate key errors.
// It works like Insert but adds IGNORE keyword to the SQL statement.
InsertIgnore(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error)
// InsertAndGetId inserts a record and returns the auto-generated ID.
// It's a convenience method combining Insert with LastInsertId.
InsertAndGetId(ctx context.Context, table string, data interface{}, batch ...int) (int64, error)
// Replace inserts or replaces records using REPLACE INTO syntax.
// Existing records with same unique key will be deleted and re-inserted.
Replace(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error)
// Save inserts or updates records using INSERT ... ON DUPLICATE KEY UPDATE syntax.
// It updates existing records instead of replacing them entirely.
Save(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error)
// Update updates records in table that match the condition.
// The data can be a map or struct containing the new values.
// The condition specifies the WHERE clause with optional placeholder args.
Update(ctx context.Context, table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error)
// Delete deletes records from table that match the condition.
// The condition specifies the WHERE clause with optional placeholder args.
Delete(ctx context.Context, table string, condition interface{}, args ...interface{}) (sql.Result, error)
// ===========================================================================
// Internal APIs for CURD, which can be overwritten by custom CURD implements.
// ===========================================================================
DoSelect(ctx context.Context, link Link, sql string, args ...interface{}) (result Result, err error) // See Core.DoSelect.
DoInsert(ctx context.Context, link Link, table string, data List, option DoInsertOption) (result sql.Result, err error) // See Core.DoInsert.
DoUpdate(ctx context.Context, link Link, table string, data interface{}, condition string, args ...interface{}) (result sql.Result, err error) // See Core.DoUpdate.
DoDelete(ctx context.Context, link Link, table string, condition string, args ...interface{}) (result sql.Result, err error) // See Core.DoDelete.
// DoSelect executes a SELECT query using the given link and returns the result.
// This is an internal method that can be overridden by custom implementations.
DoSelect(ctx context.Context, link Link, sql string, args ...interface{}) (result Result, err error)
DoQuery(ctx context.Context, link Link, sql string, args ...interface{}) (result Result, err error) // See Core.DoQuery.
DoExec(ctx context.Context, link Link, sql string, args ...interface{}) (result sql.Result, err error) // See Core.DoExec.
// DoInsert performs the actual INSERT operation with given options.
// This is an internal method that can be overridden by custom implementations.
DoInsert(ctx context.Context, link Link, table string, data List, option DoInsertOption) (result sql.Result, err error)
DoFilter(ctx context.Context, link Link, sql string, args []interface{}) (newSql string, newArgs []interface{}, err error) // See Core.DoFilter.
DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutput, err error) // See Core.DoCommit.
// DoUpdate performs the actual UPDATE operation.
// This is an internal method that can be overridden by custom implementations.
DoUpdate(ctx context.Context, link Link, table string, data interface{}, condition string, args ...interface{}) (result sql.Result, err error)
DoPrepare(ctx context.Context, link Link, sql string) (*Stmt, error) // See Core.DoPrepare.
// DoDelete performs the actual DELETE operation.
// This is an internal method that can be overridden by custom implementations.
DoDelete(ctx context.Context, link Link, table string, condition string, args ...interface{}) (result sql.Result, err error)
// DoQuery executes a query that returns rows.
// This is an internal method that can be overridden by custom implementations.
DoQuery(ctx context.Context, link Link, sql string, args ...interface{}) (result Result, err error)
// DoExec executes a query that doesn't return rows.
// This is an internal method that can be overridden by custom implementations.
DoExec(ctx context.Context, link Link, sql string, args ...interface{}) (result sql.Result, err error)
// DoFilter processes and filters SQL and args before execution.
// This is an internal method that can be overridden to implement custom SQL filtering.
DoFilter(ctx context.Context, link Link, sql string, args []interface{}) (newSql string, newArgs []interface{}, err error)
// DoCommit handles the actual commit operation for transactions.
// This is an internal method that can be overridden by custom implementations.
DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutput, err error)
// DoPrepare creates a prepared statement on the given link.
// This is an internal method that can be overridden by custom implementations.
DoPrepare(ctx context.Context, link Link, sql string) (*Stmt, error)
// ===========================================================================
// Query APIs for convenience purpose.
// ===========================================================================
GetAll(ctx context.Context, sql string, args ...interface{}) (Result, error) // See Core.GetAll.
GetOne(ctx context.Context, sql string, args ...interface{}) (Record, error) // See Core.GetOne.
GetValue(ctx context.Context, sql string, args ...interface{}) (Value, error) // See Core.GetValue.
GetArray(ctx context.Context, sql string, args ...interface{}) ([]Value, error) // See Core.GetArray.
GetCount(ctx context.Context, sql string, args ...interface{}) (int, error) // See Core.GetCount.
GetScan(ctx context.Context, objPointer interface{}, sql string, args ...interface{}) error // See Core.GetScan.
Union(unions ...*Model) *Model // See Core.Union.
UnionAll(unions ...*Model) *Model // See Core.UnionAll.
// GetAll executes a query and returns all rows as Result.
// It's a convenience wrapper around Query.
GetAll(ctx context.Context, sql string, args ...interface{}) (Result, error)
// GetOne executes a query and returns the first row as Record.
// It's useful when you expect only one row to be returned.
GetOne(ctx context.Context, sql string, args ...interface{}) (Record, error)
// GetValue executes a query and returns the first column of the first row.
// It's useful for queries like SELECT COUNT(*) or getting a single value.
GetValue(ctx context.Context, sql string, args ...interface{}) (Value, error)
// GetArray executes a query and returns the first column of all rows.
// It's useful for queries like SELECT id FROM table.
GetArray(ctx context.Context, sql string, args ...interface{}) ([]Value, error)
// GetCount executes a COUNT query and returns the result as an integer.
// It's a convenience method for counting rows.
GetCount(ctx context.Context, sql string, args ...interface{}) (int, error)
// GetScan executes a query and scans the result into the given object pointer.
// It automatically maps database columns to struct fields or slice elements.
GetScan(ctx context.Context, objPointer interface{}, sql string, args ...interface{}) error
// Union combines multiple SELECT queries using UNION operator.
// It returns a new Model that represents the combined query.
Union(unions ...*Model) *Model
// UnionAll combines multiple SELECT queries using UNION ALL operator.
// Unlike Union, it keeps duplicate rows in the result.
UnionAll(unions ...*Model) *Model
// ===========================================================================
// Master/Slave specification support.
// ===========================================================================
Master(schema ...string) (*sql.DB, error) // See Core.Master.
Slave(schema ...string) (*sql.DB, error) // See Core.Slave.
// Master returns a connection to the master database node.
// The optional schema parameter specifies which database schema to use.
Master(schema ...string) (*sql.DB, error)
// Slave returns a connection to a slave database node.
// The optional schema parameter specifies which database schema to use.
Slave(schema ...string) (*sql.DB, error)
// ===========================================================================
// Ping-Pong.
// ===========================================================================
PingMaster() error // See Core.PingMaster.
PingSlave() error // See Core.PingSlave.
// PingMaster checks if the master database node is accessible.
// It returns an error if the connection fails.
PingMaster() error
// PingSlave checks if any slave database node is accessible.
// It returns an error if no slave connections are available.
PingSlave() error
// ===========================================================================
// Transaction.
// ===========================================================================
Begin(ctx context.Context) (TX, error) // See Core.Begin.
Transaction(ctx context.Context, f func(ctx context.Context, tx TX) error) error // See Core.Transaction.
// Begin starts a new transaction and returns a TX interface.
// The returned TX must be committed or rolled back to release resources.
Begin(ctx context.Context) (TX, error)
// BeginWithOptions starts a new transaction with the given options and returns a TX interface.
// The options allow specifying isolation level and read-only mode.
// The returned TX must be committed or rolled back to release resources.
BeginWithOptions(ctx context.Context, opts TxOptions) (TX, error)
// Transaction executes a function within a transaction.
// It automatically handles commit/rollback based on whether f returns an error.
Transaction(ctx context.Context, f func(ctx context.Context, tx TX) error) error
// TransactionWithOptions executes a function within a transaction with specific options.
// It allows customizing transaction behavior like isolation level and timeout.
TransactionWithOptions(ctx context.Context, opts TxOptions, f func(ctx context.Context, tx TX) error) error
// ===========================================================================
// Configuration methods.
// ===========================================================================
GetCache() *gcache.Cache // See Core.GetCache.
SetDebug(debug bool) // See Core.SetDebug.
GetDebug() bool // See Core.GetDebug.
GetSchema() string // See Core.GetSchema.
GetPrefix() string // See Core.GetPrefix.
GetGroup() string // See Core.GetGroup.
SetDryRun(enabled bool) // See Core.SetDryRun.
GetDryRun() bool // See Core.GetDryRun.
SetLogger(logger glog.ILogger) // See Core.SetLogger.
GetLogger() glog.ILogger // See Core.GetLogger.
GetConfig() *ConfigNode // See Core.GetConfig.
SetMaxIdleConnCount(n int) // See Core.SetMaxIdleConnCount.
SetMaxOpenConnCount(n int) // See Core.SetMaxOpenConnCount.
SetMaxConnLifeTime(d time.Duration) // See Core.SetMaxConnLifeTime.
// GetCache returns the cache instance used by this database.
// The cache is used for query results caching.
GetCache() *gcache.Cache
// SetDebug enables or disables debug mode for SQL logging.
// When enabled, all SQL statements and their execution time are logged.
SetDebug(debug bool)
// GetDebug returns whether debug mode is enabled.
GetDebug() bool
// GetSchema returns the current database schema name.
GetSchema() string
// GetPrefix returns the table name prefix used by this database.
GetPrefix() string
// GetGroup returns the configuration group name of this database.
GetGroup() string
// SetDryRun enables or disables dry-run mode.
// In dry-run mode, SQL statements are generated but not executed.
SetDryRun(enabled bool)
// GetDryRun returns whether dry-run mode is enabled.
GetDryRun() bool
// SetLogger sets a custom logger for database operations.
// The logger must implement glog.ILogger interface.
SetLogger(logger glog.ILogger)
// GetLogger returns the current logger used by this database.
GetLogger() glog.ILogger
// GetConfig returns the configuration node used by this database.
GetConfig() *ConfigNode
// SetMaxIdleConnCount sets the maximum number of idle connections in the pool.
SetMaxIdleConnCount(n int)
// SetMaxOpenConnCount sets the maximum number of open connections to the database.
SetMaxOpenConnCount(n int)
// SetMaxConnLifeTime sets the maximum amount of time a connection may be reused.
SetMaxConnLifeTime(d time.Duration)
// ===========================================================================
// Utility methods.
// ===========================================================================
Stats(ctx context.Context) []StatsItem // See Core.Stats.
GetCtx() context.Context // See Core.GetCtx.
GetCore() *Core // See Core.GetCore
GetChars() (charLeft string, charRight string) // See Core.GetChars.
Tables(ctx context.Context, schema ...string) (tables []string, err error) // See Core.Tables. The driver must implement this function.
TableFields(ctx context.Context, table string, schema ...string) (map[string]*TableField, error) // See Core.TableFields. The driver must implement this function.
ConvertValueForField(ctx context.Context, fieldType string, fieldValue interface{}) (interface{}, error) // See Core.ConvertValueForField
ConvertValueForLocal(ctx context.Context, fieldType string, fieldValue interface{}) (interface{}, error) // See Core.ConvertValueForLocal
CheckLocalTypeForField(ctx context.Context, fieldType string, fieldValue interface{}) (LocalType, error) // See Core.CheckLocalTypeForField
FormatUpsert(columns []string, list List, option DoInsertOption) (string, error) // See Core.DoFormatUpsert
OrderRandomFunction() string // See Core.OrderRandomFunction
// Stats returns statistics about the database connection pool.
// It includes information like the number of active and idle connections.
Stats(ctx context.Context) []StatsItem
// GetCtx returns the context associated with this database instance.
GetCtx() context.Context
// GetCore returns the underlying Core instance of this database.
GetCore() *Core
// GetChars returns the left and right quote characters used for escaping identifiers.
// For example, in MySQL these are backticks: ` and `.
GetChars() (charLeft string, charRight string)
// Tables returns a list of all table names in the specified schema.
// If no schema is specified, it uses the default schema.
Tables(ctx context.Context, schema ...string) (tables []string, err error)
// TableFields returns detailed information about all fields in the specified table.
// The returned map keys are field names and values contain field metadata.
TableFields(ctx context.Context, table string, schema ...string) (map[string]*TableField, error)
// ConvertValueForField converts a value to the appropriate type for a database field.
// It handles type conversion from Go types to database-specific types.
ConvertValueForField(ctx context.Context, fieldType string, fieldValue interface{}) (interface{}, error)
// ConvertValueForLocal converts a database value to the appropriate Go type.
// It handles type conversion from database-specific types to Go types.
ConvertValueForLocal(ctx context.Context, fieldType string, fieldValue interface{}) (interface{}, error)
// CheckLocalTypeForField checks if a Go value is compatible with a database field type.
// It returns the appropriate LocalType and any conversion errors.
CheckLocalTypeForField(ctx context.Context, fieldType string, fieldValue interface{}) (LocalType, error)
// FormatUpsert formats an upsert (INSERT ... ON DUPLICATE KEY UPDATE) statement.
// It generates the appropriate SQL based on the columns, values, and options provided.
FormatUpsert(columns []string, list List, option DoInsertOption) (string, error)
// OrderRandomFunction returns the SQL function for random ordering.
// The implementation is database-specific (e.g., RAND() for MySQL).
OrderRandomFunction() string
}
// TX defines the interfaces for ORM transaction operations.
type TX interface {
Link
// Ctx binds a context to current transaction.
// The context is used for operations like timeout control.
Ctx(ctx context.Context) TX
// Raw creates and returns a model based on a raw SQL.
// The rawSql can contain placeholders ? and corresponding args.
Raw(rawSql string, args ...interface{}) *Model
// Model creates and returns a Model from given table name/struct.
// The parameter can be table name as string, or struct/*struct type.
Model(tableNameQueryOrStruct ...interface{}) *Model
// With creates and returns a Model from given object.
// It automatically analyzes the object and generates corresponding SQL.
With(object interface{}) *Model
// ===========================================================================
// Nested transaction if necessary.
// ===========================================================================
// Begin starts a nested transaction.
// It creates a new savepoint for current transaction.
Begin() error
// Commit commits current transaction/savepoint.
// For nested transactions, it releases the current savepoint.
Commit() error
// Rollback rolls back current transaction/savepoint.
// For nested transactions, it rolls back to the current savepoint.
Rollback() error
// Transaction executes given function in a nested transaction.
// It automatically handles commit/rollback based on function's error return.
Transaction(ctx context.Context, f func(ctx context.Context, tx TX) error) (err error)
// TransactionWithOptions executes given function in a nested transaction with options.
// It allows customizing transaction behavior like isolation level.
TransactionWithOptions(ctx context.Context, opts TxOptions, f func(ctx context.Context, tx TX) error) error
// ===========================================================================
// Core method.
// ===========================================================================
// Query executes a query that returns rows using given SQL and arguments.
// The args are for any placeholder parameters in the query.
Query(sql string, args ...interface{}) (result Result, err error)
// Exec executes a query that doesn't return rows.
// For example: INSERT, UPDATE, DELETE.
Exec(sql string, args ...interface{}) (sql.Result, error)
// Prepare creates a prepared statement for later queries or executions.
// Multiple queries or executions may be run concurrently from the statement.
Prepare(sql string) (*Stmt, error)
// ===========================================================================
// Query.
// ===========================================================================
// GetAll executes a query and returns all rows as Result.
// It's a convenient wrapper for Query.
GetAll(sql string, args ...interface{}) (Result, error)
// GetOne executes a query and returns the first row as Record.
// It's useful when you expect only one row to be returned.
GetOne(sql string, args ...interface{}) (Record, error)
// GetStruct executes a query and scans the result into given struct.
// The obj should be a pointer to struct.
GetStruct(obj interface{}, sql string, args ...interface{}) error
// GetStructs executes a query and scans all results into given struct slice.
// The objPointerSlice should be a pointer to slice of struct.
GetStructs(objPointerSlice interface{}, sql string, args ...interface{}) error
// GetScan executes a query and scans the result into given variables.
// The pointer can be type of struct/*struct/[]struct/[]*struct.
GetScan(pointer interface{}, sql string, args ...interface{}) error
// GetValue executes a query and returns the first column of first row.
// It's useful for queries like SELECT COUNT(*).
GetValue(sql string, args ...interface{}) (Value, error)
// GetCount executes a query that should return a count value.
// It's a convenient wrapper for count queries.
GetCount(sql string, args ...interface{}) (int64, error)
// ===========================================================================
// CURD.
// ===========================================================================
// Insert inserts one or multiple records into table.
// The data can be map/struct/*struct/[]map/[]struct/[]*struct.
Insert(table string, data interface{}, batch ...int) (sql.Result, error)
// InsertIgnore inserts one or multiple records with IGNORE option.
// It ignores records that would cause duplicate key conflicts.
InsertIgnore(table string, data interface{}, batch ...int) (sql.Result, error)
// InsertAndGetId inserts one record and returns its id value.
// It's commonly used with auto-increment primary key.
InsertAndGetId(table string, data interface{}, batch ...int) (int64, error)
// Replace inserts or replaces records using REPLACE INTO syntax.
// Existing records with same unique key will be deleted and re-inserted.
Replace(table string, data interface{}, batch ...int) (sql.Result, error)
// Save inserts or updates records using INSERT ... ON DUPLICATE KEY UPDATE syntax.
// It updates existing records instead of replacing them entirely.
Save(table string, data interface{}, batch ...int) (sql.Result, error)
// Update updates records in table that match given condition.
// The data can be map/struct, and condition supports various formats.
Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error)
// Delete deletes records from table that match given condition.
// The condition supports various formats with optional arguments.
Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error)
// ===========================================================================
// Utility methods.
// ===========================================================================
// GetCtx returns the context that is bound to current transaction.
GetCtx() context.Context
// GetDB returns the underlying DB interface object.
GetDB() DB
// GetSqlTX returns the underlying *sql.Tx object.
// Note: be very careful when using this method.
GetSqlTX() *sql.Tx
// IsClosed checks if current transaction is closed.
// A transaction is closed after Commit or Rollback.
IsClosed() bool
// ===========================================================================
// Save point feature.
// ===========================================================================
// SavePoint creates a save point with given name.
// It's used in nested transactions to create rollback points.
SavePoint(point string) error
// RollbackTo rolls back transaction to previously created save point.
// If the save point doesn't exist, it returns an error.
RollbackTo(point string) error
}
@ -287,6 +535,7 @@ type DoCommitInput struct {
Sql string
Args []interface{}
Type SqlType
TxOptions sql.TxOptions
IsTransaction bool
}
@ -380,9 +629,6 @@ const (
defaultMaxIdleConnCount = 10 // Max idle connection count in pool.
defaultMaxOpenConnCount = 0 // Max open connection count in pool. Default is no limit.
defaultMaxConnLifeTime = 30 * time.Second // Max lifetime for per connection in pool in seconds.
ctxTimeoutTypeExec = 0
ctxTimeoutTypeQuery = 1
ctxTimeoutTypePrepare = 2
cachePrefixTableFields = `TableFields:`
cachePrefixSelectCache = `SelectCache:`
commandEnvKeyForDryRun = "gf.gdb.dryrun"
@ -396,6 +642,15 @@ const (
linkPattern = `(\w+):([\w\-\$]*):(.*?)@(\w+?)\((.+?)\)/{0,1}([^\?]*)\?{0,1}(.*)`
)
type ctxTimeoutType int
const (
ctxTimeoutTypeExec ctxTimeoutType = iota
ctxTimeoutTypeQuery
ctxTimeoutTypePrepare
ctxTimeoutTypeTrans
)
type SelectType int
const (
@ -640,7 +895,7 @@ func Instance(name ...string) (db DB, err error) {
// The returned node is a clone of configuration node, which is safe for later modification.
//
// The parameter `master` specifies whether retrieving a master node, or else a slave node
// if master-slave configured.
// if master-slave nodes are configured.
func getConfigNodeByGroup(group string, master bool) (*ConfigNode, error) {
if list, ok := configs.config[group]; ok {
// Separates master and slave configuration nodes array.

View File

@ -72,24 +72,30 @@ func (c *Core) GetCtx() context.Context {
}
// GetCtxTimeout returns the context and cancel function for specified timeout type.
func (c *Core) GetCtxTimeout(ctx context.Context, timeoutType int) (context.Context, context.CancelFunc) {
func (c *Core) GetCtxTimeout(ctx context.Context, timeoutType ctxTimeoutType) (context.Context, context.CancelFunc) {
if ctx == nil {
ctx = c.db.GetCtx()
} else {
ctx = context.WithValue(ctx, "WrappedByGetCtxTimeout", nil)
}
var config = c.db.GetConfig()
switch timeoutType {
case ctxTimeoutTypeExec:
if c.db.GetConfig().ExecTimeout > 0 {
return context.WithTimeout(ctx, c.db.GetConfig().ExecTimeout)
return context.WithTimeout(ctx, config.ExecTimeout)
}
case ctxTimeoutTypeQuery:
if c.db.GetConfig().QueryTimeout > 0 {
return context.WithTimeout(ctx, c.db.GetConfig().QueryTimeout)
return context.WithTimeout(ctx, config.QueryTimeout)
}
case ctxTimeoutTypePrepare:
if c.db.GetConfig().PrepareTimeout > 0 {
return context.WithTimeout(ctx, c.db.GetConfig().PrepareTimeout)
return context.WithTimeout(ctx, config.PrepareTimeout)
}
case ctxTimeoutTypeTrans:
if c.db.GetConfig().TranTimeout > 0 {
return context.WithTimeout(ctx, config.TranTimeout)
}
default:
panic(gerror.NewCodef(gcode.CodeInvalidParameter, "invalid context timeout type: %d", timeoutType))

View File

@ -9,25 +9,50 @@ package gdb
import (
"context"
"database/sql"
"reflect"
"github.com/gogf/gf/v2/container/gtype"
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/internal/reflection"
"github.com/gogf/gf/v2/text/gregex"
"github.com/gogf/gf/v2/util/gconv"
)
// TXCore is the struct for transaction management.
type TXCore struct {
db DB // db is the current gdb database manager.
tx *sql.Tx // tx is the raw and underlying transaction manager.
ctx context.Context // ctx is the context for this transaction only.
master *sql.DB // master is the raw and underlying database manager.
transactionId string // transactionId is a unique id generated by this object for this transaction.
transactionCount int // transactionCount marks the times that Begins.
isClosed bool // isClosed marks this transaction has already been committed or rolled back.
// Propagation defines transaction propagation behavior.
type Propagation string
const (
// PropagationRequired starts a new transaction if not in a transaction,
// or uses the existing transaction if already in a transaction.
PropagationRequired Propagation = "" // REQUIRED
// PropagationSupports executes within the existing transaction if present,
// otherwise executes without transaction.
PropagationSupports Propagation = "SUPPORTS"
// PropagationRequiresNew starts a new transaction, and suspends the current transaction if one exists.
PropagationRequiresNew Propagation = "REQUIRES_NEW"
// PropagationNested starts a nested transaction if already in a transaction,
// or behaves like PropagationRequired if not in a transaction.
PropagationNested Propagation = "NESTED"
// PropagationNotSupported executes non-transactional, suspends any existing transaction.
PropagationNotSupported Propagation = "NOT_SUPPORTED"
// PropagationMandatory executes in a transaction, fails if no existing transaction.
PropagationMandatory Propagation = "MANDATORY"
// PropagationNever executes non-transactional, fails if in an existing transaction.
PropagationNever Propagation = "NEVER"
)
// TxOptions defines options for transaction control.
type TxOptions struct {
// Propagation specifies the propagation behavior.
Propagation Propagation
// Isolation is the transaction isolation level.
// If zero, the driver or database's default level is used.
Isolation sql.IsolationLevel
// ReadOnly is used to mark the transaction as read-only.
ReadOnly bool
}
const (
@ -38,15 +63,38 @@ const (
var transactionIdGenerator = gtype.NewUint64()
// DefaultTxOptions returns the default transaction options.
func DefaultTxOptions() TxOptions {
return TxOptions{
Propagation: PropagationRequired,
}
}
// Begin starts and returns the transaction object.
// You should call Commit or Rollback functions of the transaction object
// if you no longer use the transaction. Commit or Rollback functions will also
// close the transaction automatically.
func (c *Core) Begin(ctx context.Context) (tx TX, err error) {
return c.doBeginCtx(ctx)
return c.BeginWithOptions(ctx, DefaultTxOptions())
}
func (c *Core) doBeginCtx(ctx context.Context) (TX, error) {
// BeginWithOptions starts and returns the transaction object with given options.
// The options allow specifying the isolation level and read-only mode.
// You should call Commit or Rollback functions of the transaction object
// if you no longer use the transaction. Commit or Rollback functions will also
// close the transaction automatically.
func (c *Core) BeginWithOptions(ctx context.Context, opts TxOptions) (tx TX, err error) {
if ctx == nil {
ctx = c.db.GetCtx()
}
ctx = c.injectInternalCtxData(ctx)
return c.doBeginCtx(ctx, sql.TxOptions{
Isolation: opts.Isolation,
ReadOnly: opts.ReadOnly,
})
}
func (c *Core) doBeginCtx(ctx context.Context, opts sql.TxOptions) (TX, error) {
master, err := c.db.Master()
if err != nil {
return nil, err
@ -56,6 +104,7 @@ func (c *Core) doBeginCtx(ctx context.Context) (TX, error) {
Db: master,
Sql: "BEGIN",
Type: SqlTypeBegin,
TxOptions: opts,
IsTransaction: true,
})
return out.Tx, err
@ -69,22 +118,105 @@ func (c *Core) doBeginCtx(ctx context.Context) (TX, error) {
// Note that, you should not Commit or Rollback the transaction in function `f`
// as it is automatically handled by this function.
func (c *Core) Transaction(ctx context.Context, f func(ctx context.Context, tx TX) error) (err error) {
return c.TransactionWithOptions(ctx, DefaultTxOptions(), f)
}
// TransactionWithOptions wraps the transaction logic with propagation options using function `f`.
func (c *Core) TransactionWithOptions(
ctx context.Context, opts TxOptions, f func(ctx context.Context, tx TX) error,
) (err error) {
if ctx == nil {
ctx = c.db.GetCtx()
}
ctx = c.injectInternalCtxData(ctx)
// Check transaction object from context.
var tx TX
tx = TXFromCtx(ctx, c.db.GetGroup())
if tx != nil {
return tx.Transaction(ctx, f)
// Check current transaction from context
var (
group = c.db.GetGroup()
currentTx = TXFromCtx(ctx, group)
)
switch opts.Propagation {
case PropagationRequired:
if currentTx != nil {
return currentTx.Transaction(ctx, f)
}
return c.createNewTransaction(ctx, opts, f)
case PropagationSupports:
return f(ctx, currentTx)
case PropagationMandatory:
if currentTx == nil {
return gerror.NewCode(
gcode.CodeInvalidOperation,
"transaction propagation MANDATORY requires an existing transaction",
)
}
return f(ctx, currentTx)
case PropagationRequiresNew:
ctx = WithoutTX(ctx, group)
return c.createNewTransaction(ctx, opts, f)
case PropagationNotSupported:
ctx = WithoutTX(ctx, group)
return f(ctx, nil)
case PropagationNever:
if currentTx != nil {
return gerror.NewCode(
gcode.CodeInvalidOperation,
"transaction propagation NEVER cannot run within an existing transaction",
)
}
return f(ctx, nil)
case PropagationNested:
if currentTx != nil {
// Create savepoint for nested transaction
if err = currentTx.Begin(); err != nil {
return err
}
defer func() {
if err != nil {
if rbErr := currentTx.Rollback(); rbErr != nil {
err = gerror.Wrap(err, rbErr.Error())
}
}
}()
return f(ctx, currentTx)
}
return c.createNewTransaction(ctx, opts, f)
default:
return gerror.NewCodef(
gcode.CodeInvalidParameter,
"unsupported propagation behavior: %s",
opts.Propagation,
)
}
tx, err = c.doBeginCtx(ctx)
}
// createNewTransaction handles creating and managing a new transaction
func (c *Core) createNewTransaction(
ctx context.Context, opts TxOptions, f func(ctx context.Context, tx TX) error,
) (err error) {
// Begin transaction with options
tx, err := c.doBeginCtx(ctx, sql.TxOptions{
Isolation: opts.Isolation,
ReadOnly: opts.ReadOnly,
})
if err != nil {
return err
}
// Inject transaction object into context.
tx = tx.Ctx(WithTX(tx.GetCtx(), tx))
// Inject transaction object into context
ctx = WithTX(tx.GetCtx(), tx)
err = callTxFunc(tx.Ctx(ctx), f)
return
}
func callTxFunc(tx TX, f func(ctx context.Context, tx TX) error) (err error) {
defer func() {
if err == nil {
if exception := recover(); exception != nil {
@ -128,6 +260,12 @@ func WithTX(ctx context.Context, tx TX) context.Context {
return ctx
}
// WithoutTX removed transaction object from context and returns a new context.
func WithoutTX(ctx context.Context, group string) context.Context {
ctx = context.WithValue(ctx, transactionKeyForContext(group), nil)
return ctx
}
// TXFromCtx retrieves and returns transaction object from context.
// It is usually used in nested transaction feature, and it returns nil if it is not set previously.
func TXFromCtx(ctx context.Context, group string) TX {
@ -150,395 +288,3 @@ func TXFromCtx(ctx context.Context, group string) TX {
func transactionKeyForContext(group string) string {
return contextTransactionKeyPrefix + group
}
// transactionKeyForNestedPoint forms and returns the transaction key at current save point.
func (tx *TXCore) transactionKeyForNestedPoint() string {
return tx.db.GetCore().QuoteWord(transactionPointerPrefix + gconv.String(tx.transactionCount))
}
// Ctx sets the context for current transaction.
func (tx *TXCore) Ctx(ctx context.Context) TX {
tx.ctx = ctx
if tx.ctx != nil {
tx.ctx = tx.db.GetCore().injectInternalCtxData(tx.ctx)
}
return tx
}
// GetCtx returns the context for current transaction.
func (tx *TXCore) GetCtx() context.Context {
return tx.ctx
}
// GetDB returns the DB for current transaction.
func (tx *TXCore) GetDB() DB {
return tx.db
}
// GetSqlTX returns the underlying transaction object for current transaction.
func (tx *TXCore) GetSqlTX() *sql.Tx {
return tx.tx
}
// Commit commits current transaction.
// Note that it releases previous saved transaction point if it's in a nested transaction procedure,
// or else it commits the hole transaction.
func (tx *TXCore) Commit() error {
if tx.transactionCount > 0 {
tx.transactionCount--
_, err := tx.Exec("RELEASE SAVEPOINT " + tx.transactionKeyForNestedPoint())
return err
}
_, err := tx.db.DoCommit(tx.ctx, DoCommitInput{
Tx: tx.tx,
Sql: "COMMIT",
Type: SqlTypeTXCommit,
IsTransaction: true,
})
if err == nil {
tx.isClosed = true
}
return err
}
// Rollback aborts current transaction.
// Note that it aborts current transaction if it's in a nested transaction procedure,
// or else it aborts the hole transaction.
func (tx *TXCore) Rollback() error {
if tx.transactionCount > 0 {
tx.transactionCount--
_, err := tx.Exec("ROLLBACK TO SAVEPOINT " + tx.transactionKeyForNestedPoint())
return err
}
_, err := tx.db.DoCommit(tx.ctx, DoCommitInput{
Tx: tx.tx,
Sql: "ROLLBACK",
Type: SqlTypeTXRollback,
IsTransaction: true,
})
if err == nil {
tx.isClosed = true
}
return err
}
// IsClosed checks and returns this transaction has already been committed or rolled back.
func (tx *TXCore) IsClosed() bool {
return tx.isClosed
}
// Begin starts a nested transaction procedure.
func (tx *TXCore) Begin() error {
_, err := tx.Exec("SAVEPOINT " + tx.transactionKeyForNestedPoint())
if err != nil {
return err
}
tx.transactionCount++
return nil
}
// SavePoint performs `SAVEPOINT xxx` SQL statement that saves transaction at current point.
// The parameter `point` specifies the point name that will be saved to server.
func (tx *TXCore) SavePoint(point string) error {
_, err := tx.Exec("SAVEPOINT " + tx.db.GetCore().QuoteWord(point))
return err
}
// RollbackTo performs `ROLLBACK TO SAVEPOINT xxx` SQL statement that rollbacks to specified saved transaction.
// The parameter `point` specifies the point name that was saved previously.
func (tx *TXCore) RollbackTo(point string) error {
_, err := tx.Exec("ROLLBACK TO SAVEPOINT " + tx.db.GetCore().QuoteWord(point))
return err
}
// Transaction wraps the transaction logic using function `f`.
// It rollbacks the transaction and returns the error from function `f` if
// it returns non-nil error. It commits the transaction and returns nil if
// function `f` returns nil.
//
// Note that, you should not Commit or Rollback the transaction in function `f`
// as it is automatically handled by this function.
func (tx *TXCore) Transaction(ctx context.Context, f func(ctx context.Context, tx TX) error) (err error) {
if ctx != nil {
tx.ctx = ctx
}
// Check transaction object from context.
if TXFromCtx(tx.ctx, tx.db.GetGroup()) == nil {
// Inject transaction object into context.
tx.ctx = WithTX(tx.ctx, tx)
}
err = tx.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
if exception := recover(); exception != nil {
if v, ok := exception.(error); ok && gerror.HasStack(v) {
err = v
} else {
err = gerror.NewCodef(gcode.CodeInternalPanic, "%+v", exception)
}
}
}
if err != nil {
if e := tx.Rollback(); e != nil {
err = e
}
} else {
if e := tx.Commit(); e != nil {
err = e
}
}
}()
err = f(tx.ctx, tx)
return
}
// Query does query operation on transaction.
// See Core.Query.
func (tx *TXCore) Query(sql string, args ...interface{}) (result Result, err error) {
return tx.db.DoQuery(tx.ctx, &txLink{tx.tx}, sql, args...)
}
// Exec does none query operation on transaction.
// See Core.Exec.
func (tx *TXCore) Exec(sql string, args ...interface{}) (sql.Result, error) {
return tx.db.DoExec(tx.ctx, &txLink{tx.tx}, sql, args...)
}
// Prepare creates a prepared statement for later queries or executions.
// Multiple queries or executions may be run concurrently from the
// returned statement.
// The caller must call the statement's Close method
// when the statement is no longer needed.
func (tx *TXCore) Prepare(sql string) (*Stmt, error) {
return tx.db.DoPrepare(tx.ctx, &txLink{tx.tx}, sql)
}
// GetAll queries and returns data records from database.
func (tx *TXCore) GetAll(sql string, args ...interface{}) (Result, error) {
return tx.Query(sql, args...)
}
// GetOne queries and returns one record from database.
func (tx *TXCore) GetOne(sql string, args ...interface{}) (Record, error) {
list, err := tx.GetAll(sql, args...)
if err != nil {
return nil, err
}
if len(list) > 0 {
return list[0], nil
}
return nil, nil
}
// GetStruct queries one record from database and converts it to given struct.
// The parameter `pointer` should be a pointer to struct.
func (tx *TXCore) GetStruct(obj interface{}, sql string, args ...interface{}) error {
one, err := tx.GetOne(sql, args...)
if err != nil {
return err
}
return one.Struct(obj)
}
// GetStructs queries records from database and converts them to given struct.
// The parameter `pointer` should be type of struct slice: []struct/[]*struct.
func (tx *TXCore) GetStructs(objPointerSlice interface{}, sql string, args ...interface{}) error {
all, err := tx.GetAll(sql, args...)
if err != nil {
return err
}
return all.Structs(objPointerSlice)
}
// GetScan queries one or more records from database and converts them to given struct or
// struct array.
//
// If parameter `pointer` is type of struct pointer, it calls GetStruct internally for
// the conversion. If parameter `pointer` is type of slice, it calls GetStructs internally
// for conversion.
func (tx *TXCore) GetScan(pointer interface{}, sql string, args ...interface{}) error {
reflectInfo := reflection.OriginTypeAndKind(pointer)
if reflectInfo.InputKind != reflect.Ptr {
return gerror.NewCodef(
gcode.CodeInvalidParameter,
"params should be type of pointer, but got: %v",
reflectInfo.InputKind,
)
}
switch reflectInfo.OriginKind {
case reflect.Array, reflect.Slice:
return tx.GetStructs(pointer, sql, args...)
case reflect.Struct:
return tx.GetStruct(pointer, sql, args...)
}
return gerror.NewCodef(
gcode.CodeInvalidParameter,
`in valid parameter type "%v", of which element type should be type of struct/slice`,
reflectInfo.InputType,
)
}
// GetValue queries and returns the field value from database.
// The sql should query only one field from database, or else it returns only one
// field of the result.
func (tx *TXCore) GetValue(sql string, args ...interface{}) (Value, error) {
one, err := tx.GetOne(sql, args...)
if err != nil {
return nil, err
}
for _, v := range one {
return v, nil
}
return nil, nil
}
// GetCount queries and returns the count from database.
func (tx *TXCore) GetCount(sql string, args ...interface{}) (int64, error) {
if !gregex.IsMatchString(`(?i)SELECT\s+COUNT\(.+\)\s+FROM`, sql) {
sql, _ = gregex.ReplaceString(`(?i)(SELECT)\s+(.+)\s+(FROM)`, `$1 COUNT($2) $3`, sql)
}
value, err := tx.GetValue(sql, args...)
if err != nil {
return 0, err
}
return value.Int64(), nil
}
// Insert does "INSERT INTO ..." statement for the table.
// If there's already one unique record of the data in the table, it returns error.
//
// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc.
// Eg:
// Data(g.Map{"uid": 10000, "name":"john"})
// Data(g.Slice{g.Map{"uid": 10000, "name":"john"}, g.Map{"uid": 20000, "name":"smith"})
//
// The parameter `batch` specifies the batch operation count when given data is slice.
func (tx *TXCore) Insert(table string, data interface{}, batch ...int) (sql.Result, error) {
if len(batch) > 0 {
return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).Insert()
}
return tx.Model(table).Ctx(tx.ctx).Data(data).Insert()
}
// InsertIgnore does "INSERT IGNORE INTO ..." statement for the table.
// If there's already one unique record of the data in the table, it ignores the inserting.
//
// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc.
// Eg:
// Data(g.Map{"uid": 10000, "name":"john"})
// Data(g.Slice{g.Map{"uid": 10000, "name":"john"}, g.Map{"uid": 20000, "name":"smith"})
//
// The parameter `batch` specifies the batch operation count when given data is slice.
func (tx *TXCore) InsertIgnore(table string, data interface{}, batch ...int) (sql.Result, error) {
if len(batch) > 0 {
return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).InsertIgnore()
}
return tx.Model(table).Ctx(tx.ctx).Data(data).InsertIgnore()
}
// InsertAndGetId performs action Insert and returns the last insert id that automatically generated.
func (tx *TXCore) InsertAndGetId(table string, data interface{}, batch ...int) (int64, error) {
if len(batch) > 0 {
return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).InsertAndGetId()
}
return tx.Model(table).Ctx(tx.ctx).Data(data).InsertAndGetId()
}
// Replace does "REPLACE INTO ..." statement for the table.
// If there's already one unique record of the data in the table, it deletes the record
// and inserts a new one.
//
// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc.
// Eg:
// Data(g.Map{"uid": 10000, "name":"john"})
// Data(g.Slice{g.Map{"uid": 10000, "name":"john"}, g.Map{"uid": 20000, "name":"smith"})
//
// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc.
// If given data is type of slice, it then does batch replacing, and the optional parameter
// `batch` specifies the batch operation count.
func (tx *TXCore) Replace(table string, data interface{}, batch ...int) (sql.Result, error) {
if len(batch) > 0 {
return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).Replace()
}
return tx.Model(table).Ctx(tx.ctx).Data(data).Replace()
}
// Save does "INSERT INTO ... ON DUPLICATE KEY UPDATE..." statement for the table.
// It updates the record if there's primary or unique index in the saving data,
// or else it inserts a new record into the table.
//
// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc.
// Eg:
// Data(g.Map{"uid": 10000, "name":"john"})
// Data(g.Slice{g.Map{"uid": 10000, "name":"john"}, g.Map{"uid": 20000, "name":"smith"})
//
// If given data is type of slice, it then does batch saving, and the optional parameter
// `batch` specifies the batch operation count.
func (tx *TXCore) Save(table string, data interface{}, batch ...int) (sql.Result, error) {
if len(batch) > 0 {
return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).Save()
}
return tx.Model(table).Ctx(tx.ctx).Data(data).Save()
}
// Update does "UPDATE ... " statement for the table.
//
// The parameter `data` can be type of string/map/gmap/struct/*struct, etc.
// Eg: "uid=10000", "uid", 10000, g.Map{"uid": 10000, "name":"john"}
//
// The parameter `condition` can be type of string/map/gmap/slice/struct/*struct, etc.
// It is commonly used with parameter `args`.
// Eg:
// "uid=10000",
// "uid", 10000
// "money>? AND name like ?", 99999, "vip_%"
// "status IN (?)", g.Slice{1,2,3}
// "age IN(?,?)", 18, 50
// User{ Id : 1, UserName : "john"}.
func (tx *TXCore) Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error) {
return tx.Model(table).Ctx(tx.ctx).Data(data).Where(condition, args...).Update()
}
// Delete does "DELETE FROM ... " statement for the table.
//
// The parameter `condition` can be type of string/map/gmap/slice/struct/*struct, etc.
// It is commonly used with parameter `args`.
// Eg:
// "uid=10000",
// "uid", 10000
// "money>? AND name like ?", 99999, "vip_%"
// "status IN (?)", g.Slice{1,2,3}
// "age IN(?,?)", 18, 50
// User{ Id : 1, UserName : "john"}.
func (tx *TXCore) Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error) {
return tx.Model(table).Ctx(tx.ctx).Where(condition, args...).Delete()
}
// QueryContext implements interface function Link.QueryContext.
func (tx *TXCore) QueryContext(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) {
return tx.tx.QueryContext(ctx, sql, args...)
}
// ExecContext implements interface function Link.ExecContext.
func (tx *TXCore) ExecContext(ctx context.Context, sql string, args ...interface{}) (sql.Result, error) {
return tx.tx.ExecContext(ctx, sql, args...)
}
// PrepareContext implements interface function Link.PrepareContext.
func (tx *TXCore) PrepareContext(ctx context.Context, sql string) (*sql.Stmt, error) {
return tx.tx.PrepareContext(ctx, sql)
}
// IsOnMaster implements interface function Link.IsOnMaster.
func (tx *TXCore) IsOnMaster() bool {
return true
}
// IsTransaction implements interface function Link.IsTransaction.
func (tx *TXCore) IsTransaction() bool {
return true
}

View File

@ -0,0 +1,412 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gdb
import (
"context"
"database/sql"
"reflect"
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/internal/reflection"
"github.com/gogf/gf/v2/text/gregex"
"github.com/gogf/gf/v2/util/gconv"
)
// TXCore is the struct for transaction management.
type TXCore struct {
db DB // db is the current gdb database manager.
tx *sql.Tx // tx is the raw and underlying transaction manager.
ctx context.Context // ctx is the context for this transaction only.
master *sql.DB // master is the raw and underlying database manager.
transactionId string // transactionId is a unique id generated by this object for this transaction.
transactionCount int // transactionCount marks the times that Begins.
isClosed bool // isClosed marks this transaction has already been committed or rolled back.
}
// transactionKeyForNestedPoint forms and returns the transaction key at current save point.
func (tx *TXCore) transactionKeyForNestedPoint() string {
return tx.db.GetCore().QuoteWord(
transactionPointerPrefix + gconv.String(tx.transactionCount),
)
}
// Ctx sets the context for current transaction.
func (tx *TXCore) Ctx(ctx context.Context) TX {
tx.ctx = ctx
if tx.ctx != nil {
tx.ctx = tx.db.GetCore().injectInternalCtxData(tx.ctx)
}
return tx
}
// GetCtx returns the context for current transaction.
func (tx *TXCore) GetCtx() context.Context {
return tx.ctx
}
// GetDB returns the DB for current transaction.
func (tx *TXCore) GetDB() DB {
return tx.db
}
// GetSqlTX returns the underlying transaction object for current transaction.
func (tx *TXCore) GetSqlTX() *sql.Tx {
return tx.tx
}
// Commit commits current transaction.
// Note that it releases previous saved transaction point if it's in a nested transaction procedure,
// or else it commits the hole transaction.
func (tx *TXCore) Commit() error {
if tx.transactionCount > 0 {
tx.transactionCount--
_, err := tx.Exec("RELEASE SAVEPOINT " + tx.transactionKeyForNestedPoint())
return err
}
_, err := tx.db.DoCommit(tx.ctx, DoCommitInput{
Tx: tx.tx,
Sql: "COMMIT",
Type: SqlTypeTXCommit,
IsTransaction: true,
})
if err == nil {
tx.isClosed = true
}
return err
}
// Rollback aborts current transaction.
// Note that it aborts current transaction if it's in a nested transaction procedure,
// or else it aborts the hole transaction.
func (tx *TXCore) Rollback() error {
if tx.transactionCount > 0 {
tx.transactionCount--
_, err := tx.Exec("ROLLBACK TO SAVEPOINT " + tx.transactionKeyForNestedPoint())
return err
}
_, err := tx.db.DoCommit(tx.ctx, DoCommitInput{
Tx: tx.tx,
Sql: "ROLLBACK",
Type: SqlTypeTXRollback,
IsTransaction: true,
})
if err == nil {
tx.isClosed = true
}
return err
}
// IsClosed checks and returns this transaction has already been committed or rolled back.
func (tx *TXCore) IsClosed() bool {
return tx.isClosed
}
// Begin starts a nested transaction procedure.
func (tx *TXCore) Begin() error {
_, err := tx.Exec("SAVEPOINT " + tx.transactionKeyForNestedPoint())
if err != nil {
return err
}
tx.transactionCount++
return nil
}
// SavePoint performs `SAVEPOINT xxx` SQL statement that saves transaction at current point.
// The parameter `point` specifies the point name that will be saved to server.
func (tx *TXCore) SavePoint(point string) error {
_, err := tx.Exec("SAVEPOINT " + tx.db.GetCore().QuoteWord(point))
return err
}
// RollbackTo performs `ROLLBACK TO SAVEPOINT xxx` SQL statement that rollbacks to specified saved transaction.
// The parameter `point` specifies the point name that was saved previously.
func (tx *TXCore) RollbackTo(point string) error {
_, err := tx.Exec("ROLLBACK TO SAVEPOINT " + tx.db.GetCore().QuoteWord(point))
return err
}
// Transaction wraps the transaction logic using function `f`.
// It rollbacks the transaction and returns the error from function `f` if
// it returns non-nil error. It commits the transaction and returns nil if
// function `f` returns nil.
//
// Note that, you should not Commit or Rollback the transaction in function `f`
// as it is automatically handled by this function.
func (tx *TXCore) Transaction(ctx context.Context, f func(ctx context.Context, tx TX) error) (err error) {
if ctx != nil {
tx.ctx = ctx
}
// Check transaction object from context.
if TXFromCtx(tx.ctx, tx.db.GetGroup()) == nil {
// Inject transaction object into context.
tx.ctx = WithTX(tx.ctx, tx)
}
if err = tx.Begin(); err != nil {
return err
}
err = callTxFunc(tx, f)
return
}
// TransactionWithOptions wraps the transaction logic with propagation options using function `f`.
func (tx *TXCore) TransactionWithOptions(
ctx context.Context, opts TxOptions, f func(ctx context.Context, tx TX) error,
) (err error) {
return tx.db.TransactionWithOptions(ctx, opts, f)
}
// Query does query operation on transaction.
// See Core.Query.
func (tx *TXCore) Query(sql string, args ...interface{}) (result Result, err error) {
return tx.db.DoQuery(tx.ctx, &txLink{tx.tx}, sql, args...)
}
// Exec does none query operation on transaction.
// See Core.Exec.
func (tx *TXCore) Exec(sql string, args ...interface{}) (sql.Result, error) {
return tx.db.DoExec(tx.ctx, &txLink{tx.tx}, sql, args...)
}
// Prepare creates a prepared statement for later queries or executions.
// Multiple queries or executions may be run concurrently from the
// returned statement.
// The caller must call the statement's Close method
// when the statement is no longer needed.
func (tx *TXCore) Prepare(sql string) (*Stmt, error) {
return tx.db.DoPrepare(tx.ctx, &txLink{tx.tx}, sql)
}
// GetAll queries and returns data records from database.
func (tx *TXCore) GetAll(sql string, args ...interface{}) (Result, error) {
return tx.Query(sql, args...)
}
// GetOne queries and returns one record from database.
func (tx *TXCore) GetOne(sql string, args ...interface{}) (Record, error) {
list, err := tx.GetAll(sql, args...)
if err != nil {
return nil, err
}
if len(list) > 0 {
return list[0], nil
}
return nil, nil
}
// GetStruct queries one record from database and converts it to given struct.
// The parameter `pointer` should be a pointer to struct.
func (tx *TXCore) GetStruct(obj interface{}, sql string, args ...interface{}) error {
one, err := tx.GetOne(sql, args...)
if err != nil {
return err
}
return one.Struct(obj)
}
// GetStructs queries records from database and converts them to given struct.
// The parameter `pointer` should be type of struct slice: []struct/[]*struct.
func (tx *TXCore) GetStructs(objPointerSlice interface{}, sql string, args ...interface{}) error {
all, err := tx.GetAll(sql, args...)
if err != nil {
return err
}
return all.Structs(objPointerSlice)
}
// GetScan queries one or more records from database and converts them to given struct or
// struct array.
//
// If parameter `pointer` is type of struct pointer, it calls GetStruct internally for
// the conversion. If parameter `pointer` is type of slice, it calls GetStructs internally
// for conversion.
func (tx *TXCore) GetScan(pointer interface{}, sql string, args ...interface{}) error {
reflectInfo := reflection.OriginTypeAndKind(pointer)
if reflectInfo.InputKind != reflect.Ptr {
return gerror.NewCodef(
gcode.CodeInvalidParameter,
"params should be type of pointer, but got: %v",
reflectInfo.InputKind,
)
}
switch reflectInfo.OriginKind {
case reflect.Array, reflect.Slice:
return tx.GetStructs(pointer, sql, args...)
case reflect.Struct:
return tx.GetStruct(pointer, sql, args...)
default:
}
return gerror.NewCodef(
gcode.CodeInvalidParameter,
`in valid parameter type "%v", of which element type should be type of struct/slice`,
reflectInfo.InputType,
)
}
// GetValue queries and returns the field value from database.
// The sql should query only one field from database, or else it returns only one
// field of the result.
func (tx *TXCore) GetValue(sql string, args ...interface{}) (Value, error) {
one, err := tx.GetOne(sql, args...)
if err != nil {
return nil, err
}
for _, v := range one {
return v, nil
}
return nil, nil
}
// GetCount queries and returns the count from database.
func (tx *TXCore) GetCount(sql string, args ...interface{}) (int64, error) {
if !gregex.IsMatchString(`(?i)SELECT\s+COUNT\(.+\)\s+FROM`, sql) {
sql, _ = gregex.ReplaceString(`(?i)(SELECT)\s+(.+)\s+(FROM)`, `$1 COUNT($2) $3`, sql)
}
value, err := tx.GetValue(sql, args...)
if err != nil {
return 0, err
}
return value.Int64(), nil
}
// Insert does "INSERT INTO ..." statement for the table.
// If there's already one unique record of the data in the table, it returns error.
//
// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc.
// Eg:
// Data(g.Map{"uid": 10000, "name":"john"})
// Data(g.Slice{g.Map{"uid": 10000, "name":"john"}, g.Map{"uid": 20000, "name":"smith"})
//
// The parameter `batch` specifies the batch operation count when given data is slice.
func (tx *TXCore) Insert(table string, data interface{}, batch ...int) (sql.Result, error) {
if len(batch) > 0 {
return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).Insert()
}
return tx.Model(table).Ctx(tx.ctx).Data(data).Insert()
}
// InsertIgnore does "INSERT IGNORE INTO ..." statement for the table.
// If there's already one unique record of the data in the table, it ignores the inserting.
//
// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc.
// Eg:
// Data(g.Map{"uid": 10000, "name":"john"})
// Data(g.Slice{g.Map{"uid": 10000, "name":"john"}, g.Map{"uid": 20000, "name":"smith"})
//
// The parameter `batch` specifies the batch operation count when given data is slice.
func (tx *TXCore) InsertIgnore(table string, data interface{}, batch ...int) (sql.Result, error) {
if len(batch) > 0 {
return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).InsertIgnore()
}
return tx.Model(table).Ctx(tx.ctx).Data(data).InsertIgnore()
}
// InsertAndGetId performs action Insert and returns the last insert id that automatically generated.
func (tx *TXCore) InsertAndGetId(table string, data interface{}, batch ...int) (int64, error) {
if len(batch) > 0 {
return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).InsertAndGetId()
}
return tx.Model(table).Ctx(tx.ctx).Data(data).InsertAndGetId()
}
// Replace does "REPLACE INTO ..." statement for the table.
// If there's already one unique record of the data in the table, it deletes the record
// and inserts a new one.
//
// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc.
// Eg:
// Data(g.Map{"uid": 10000, "name":"john"})
// Data(g.Slice{g.Map{"uid": 10000, "name":"john"}, g.Map{"uid": 20000, "name":"smith"})
//
// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc.
// If given data is type of slice, it then does batch replacing, and the optional parameter
// `batch` specifies the batch operation count.
func (tx *TXCore) Replace(table string, data interface{}, batch ...int) (sql.Result, error) {
if len(batch) > 0 {
return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).Replace()
}
return tx.Model(table).Ctx(tx.ctx).Data(data).Replace()
}
// Save does "INSERT INTO ... ON DUPLICATE KEY UPDATE..." statement for the table.
// It updates the record if there's primary or unique index in the saving data,
// or else it inserts a new record into the table.
//
// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc.
// Eg:
// Data(g.Map{"uid": 10000, "name":"john"})
// Data(g.Slice{g.Map{"uid": 10000, "name":"john"}, g.Map{"uid": 20000, "name":"smith"})
//
// If given data is type of slice, it then does batch saving, and the optional parameter
// `batch` specifies the batch operation count.
func (tx *TXCore) Save(table string, data interface{}, batch ...int) (sql.Result, error) {
if len(batch) > 0 {
return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).Save()
}
return tx.Model(table).Ctx(tx.ctx).Data(data).Save()
}
// Update does "UPDATE ... " statement for the table.
//
// The parameter `data` can be type of string/map/gmap/struct/*struct, etc.
// Eg: "uid=10000", "uid", 10000, g.Map{"uid": 10000, "name":"john"}
//
// The parameter `condition` can be type of string/map/gmap/slice/struct/*struct, etc.
// It is commonly used with parameter `args`.
// Eg:
// "uid=10000",
// "uid", 10000
// "money>? AND name like ?", 99999, "vip_%"
// "status IN (?)", g.Slice{1,2,3}
// "age IN(?,?)", 18, 50
// User{ Id : 1, UserName : "john"}.
func (tx *TXCore) Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error) {
return tx.Model(table).Ctx(tx.ctx).Data(data).Where(condition, args...).Update()
}
// Delete does "DELETE FROM ... " statement for the table.
//
// The parameter `condition` can be type of string/map/gmap/slice/struct/*struct, etc.
// It is commonly used with parameter `args`.
// Eg:
// "uid=10000",
// "uid", 10000
// "money>? AND name like ?", 99999, "vip_%"
// "status IN (?)", g.Slice{1,2,3}
// "age IN(?,?)", 18, 50
// User{ Id : 1, UserName : "john"}.
func (tx *TXCore) Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error) {
return tx.Model(table).Ctx(tx.ctx).Where(condition, args...).Delete()
}
// QueryContext implements interface function Link.QueryContext.
func (tx *TXCore) QueryContext(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) {
return tx.tx.QueryContext(ctx, sql, args...)
}
// ExecContext implements interface function Link.ExecContext.
func (tx *TXCore) ExecContext(ctx context.Context, sql string, args ...interface{}) (sql.Result, error) {
return tx.tx.ExecContext(ctx, sql, args...)
}
// PrepareContext implements interface function Link.PrepareContext.
func (tx *TXCore) PrepareContext(ctx context.Context, sql string) (*sql.Stmt, error) {
return tx.tx.PrepareContext(ctx, sql)
}
// IsOnMaster implements interface function Link.IsOnMaster.
func (tx *TXCore) IsOnMaster() bool {
return true
}
// IsTransaction implements interface function Link.IsTransaction.
func (tx *TXCore) IsTransaction() bool {
return true
}

View File

@ -187,7 +187,13 @@ func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutp
// Execution cased by type.
switch in.Type {
case SqlTypeBegin:
if sqlTx, err = in.Db.Begin(); err == nil {
ctx, cancelFuncForTimeout = c.GetCtxTimeout(ctx, ctxTimeoutTypeTrans)
defer cancelFuncForTimeout()
formattedSql = fmt.Sprintf(
`%s (IosolationLevel: %s, ReadOnly: %t)`,
formattedSql, in.TxOptions.Isolation.String(), in.TxOptions.ReadOnly,
)
if sqlTx, err = in.Db.BeginTx(ctx, &in.TxOptions); err == nil {
out.Tx = &TXCore{
db: c.db,
tx: sqlTx,
@ -206,6 +212,8 @@ func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutp
err = in.Tx.Rollback()
case SqlTypeExecContext:
ctx, cancelFuncForTimeout = c.GetCtxTimeout(ctx, ctxTimeoutTypeExec)
defer cancelFuncForTimeout()
if c.db.GetDryRun() {
sqlResult = new(SqlResult)
} else {
@ -214,10 +222,14 @@ func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutp
out.RawResult = sqlResult
case SqlTypeQueryContext:
ctx, cancelFuncForTimeout = c.GetCtxTimeout(ctx, ctxTimeoutTypeQuery)
defer cancelFuncForTimeout()
sqlRows, err = in.Link.QueryContext(ctx, in.Sql, in.Args...)
out.RawResult = sqlRows
case SqlTypePrepareContext:
ctx, cancelFuncForTimeout = c.GetCtxTimeout(ctx, ctxTimeoutTypePrepare)
defer cancelFuncForTimeout()
sqlStmt, err = in.Link.PrepareContext(ctx, in.Sql)
out.RawResult = sqlStmt

View File

@ -26,3 +26,17 @@ func (m *Model) Transaction(ctx context.Context, f func(ctx context.Context, tx
}
return m.db.Transaction(ctx, f)
}
// TransactionWithOptions executes transaction with options.
// The parameter `opts` specifies the transaction options.
// The parameter `f` specifies the function that will be called within the transaction.
// If f returns error, the transaction will be rolled back, or else the transaction will be committed.
func (m *Model) TransactionWithOptions(ctx context.Context, opts TxOptions, f func(ctx context.Context, tx TX) error) (err error) {
if ctx == nil {
ctx = m.GetCtx()
}
if m.tx != nil {
return m.tx.Transaction(ctx, f)
}
return m.db.TransactionWithOptions(ctx, opts, f)
}