1
0
mirror of https://github.com/gogf/gf.git synced 2025-04-05 03:05:05 +08:00

fix(database/gdb): fix transaction propagation feature (#4199)

This commit is contained in:
John Guo 2025-03-17 14:50:07 +08:00 committed by GitHub
parent abf77fac50
commit 9a61a6970f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 80 additions and 36 deletions

View File

@ -1286,8 +1286,7 @@ func Test_Transaction_Propagation(t *testing.T) {
Propagation: gdb.PropagationNotSupported,
}, func(ctx context.Context, tx2 gdb.TX) error {
// Should execute without transaction
t.Assert(tx2, nil)
_, err := db.Insert(ctx, table, g.Map{
_, err = db.Insert(ctx, table, g.Map{
"id": 9,
"passport": "non_tx_record",
})
@ -1346,8 +1345,6 @@ func Test_Transaction_Propagation(t *testing.T) {
err := db.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationNever,
}, func(ctx context.Context, tx gdb.TX) error {
// Should execute without transaction
t.Assert(tx, nil)
_, err := db.Insert(ctx, table, g.Map{
"id": 11,
"passport": "never",
@ -1369,6 +1366,51 @@ func Test_Transaction_Propagation(t *testing.T) {
})
}
func Test_Transaction_Propagation_PropagationSupports(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
table := createTable()
defer dropTable(table)
// scenario1: when in a transaction, use PropagationSupports to execute a transaction
err := db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
// insert in outer tx.
_, err := tx.Insert(table, g.Map{
"id": 1,
})
if err != nil {
return err
}
err = tx.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationSupports,
}, func(ctx context.Context, tx2 gdb.TX) error {
_, err = tx2.Insert(table, g.Map{
"id": 2,
})
return gerror.New("error")
})
return err
})
t.AssertNE(err, nil)
// scenario2: when not in a transaction, do not use transaction but direct db link.
err = db.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationSupports,
}, func(ctx context.Context, tx gdb.TX) error {
_, err = tx.Insert(table, g.Map{
"id": 3,
})
return err
})
t.AssertNil(err)
// 查询结果
result, err := db.Model(table).OrderAsc("id").All()
t.AssertNil(err)
t.Assert(len(result), 1)
t.Assert(result[0]["id"], 3)
})
}
func Test_Transaction_Propagation_Complex(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
table1 := createTable()
@ -1389,7 +1431,7 @@ func Test_Transaction_Propagation_Complex(t *testing.T) {
err = tx1.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationNested,
}, func(ctx context.Context, tx2 gdb.TX) error {
_, err := tx2.Insert(table1, g.Map{
_, err = tx2.Insert(table1, g.Map{
"id": 2,
"passport": "nested1",
})
@ -1427,10 +1469,7 @@ func Test_Transaction_Propagation_Complex(t *testing.T) {
err = tx1.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationNotSupported,
}, func(ctx context.Context, tx2 gdb.TX) error {
// Should execute without transaction
t.Assert(tx2, nil)
_, err := db.Insert(ctx, table2, g.Map{
_, err = db.Insert(ctx, table2, g.Map{
"id": 5,
"passport": "not_supported",
})
@ -1489,9 +1528,6 @@ func Test_Transaction_Propagation_Complex(t *testing.T) {
err = tx1.TransactionWithOptions(ctx, gdb.TxOptions{
Propagation: gdb.PropagationNotSupported,
}, func(ctx context.Context, tx2 gdb.TX) error {
// Should execute without transaction
t.Assert(tx2, nil)
// Start a new independent transaction
return db.Transaction(ctx, func(ctx context.Context, tx3 gdb.TX) error {
_, err := tx3.Insert(table, g.Map{

View File

@ -19,9 +19,15 @@ import (
type Propagation string
const (
// PropagationNested starts a nested transaction if already in a transaction,
// or behaves like PropagationRequired if not in a transaction.
//
// It is the default behavior.
PropagationNested Propagation = "NESTED"
// PropagationRequired starts a new transaction if not in a transaction,
// or uses the existing transaction if already in a transaction.
PropagationRequired Propagation = "" // REQUIRED
PropagationRequired Propagation = "REQUIRED"
// PropagationSupports executes within the existing transaction if present,
// otherwise executes without transaction.
@ -30,10 +36,6 @@ const (
// PropagationRequiresNew starts a new transaction, and suspends the current transaction if one exists.
PropagationRequiresNew Propagation = "REQUIRES_NEW"
// PropagationNested starts a nested transaction if already in a transaction,
// or behaves like PropagationRequired if not in a transaction.
PropagationNested Propagation = "NESTED"
// PropagationNotSupported executes non-transactional, suspends any existing transaction.
PropagationNotSupported Propagation = "NOT_SUPPORTED"
@ -66,7 +68,8 @@ var transactionIdGenerator = gtype.NewUint64()
// DefaultTxOptions returns the default transaction options.
func DefaultTxOptions() TxOptions {
return TxOptions{
Propagation: PropagationRequired,
// Note the default propagation type is PropagationNested not PropagationRequired.
Propagation: PropagationNested,
}
}
@ -138,11 +141,14 @@ func (c *Core) TransactionWithOptions(
switch opts.Propagation {
case PropagationRequired:
if currentTx != nil {
return currentTx.Transaction(ctx, f)
return f(ctx, currentTx)
}
return c.createNewTransaction(ctx, opts, f)
case PropagationSupports:
if currentTx == nil {
currentTx = c.newEmptyTX()
}
return f(ctx, currentTx)
case PropagationMandatory:
@ -160,7 +166,7 @@ func (c *Core) TransactionWithOptions(
case PropagationNotSupported:
ctx = WithoutTX(ctx, group)
return f(ctx, nil)
return f(ctx, c.newEmptyTX())
case PropagationNever:
if currentTx != nil {
@ -169,22 +175,12 @@ func (c *Core) TransactionWithOptions(
"transaction propagation NEVER cannot run within an existing transaction",
)
}
return f(ctx, nil)
ctx = WithoutTX(ctx, group)
return f(ctx, c.newEmptyTX())
case PropagationNested:
if currentTx != nil {
// Create savepoint for nested transaction
if err = currentTx.Begin(); err != nil {
return err
}
defer func() {
if err != nil {
if rbErr := currentTx.Rollback(); rbErr != nil {
err = gerror.Wrap(err, rbErr.Error())
}
}
}()
return f(ctx, currentTx)
return currentTx.Transaction(ctx, f)
}
return c.createNewTransaction(ctx, opts, f)
@ -280,6 +276,10 @@ func TXFromCtx(ctx context.Context, group string) TX {
if tx.IsClosed() {
return nil
}
// no underlying sql tx.
if tx.GetSqlTX() == nil {
return nil
}
tx = tx.Ctx(ctx)
return tx
}

View File

@ -46,6 +46,12 @@ type TXCore struct {
cancelFunc context.CancelFunc
}
func (c *Core) newEmptyTX() TX {
return &TXCore{
db: c.db,
}
}
// transactionKeyForNestedPoint forms and returns the transaction key at current save point.
func (tx *TXCore) transactionKeyForNestedPoint() string {
return tx.db.GetCore().QuoteWord(
@ -427,5 +433,5 @@ func (tx *TXCore) IsOnMaster() bool {
// IsTransaction implements interface function Link.IsTransaction.
func (tx *TXCore) IsTransaction() bool {
return true
return tx != nil
}

View File

@ -103,7 +103,7 @@ func (c *Core) DoExec(ctx context.Context, link Link, sql string, args ...interf
return nil, err
}
} else if !link.IsTransaction() {
// If current link is not transaction link, it checks and retrieves transaction from context.
// If current link is not transaction link, it tries retrieving transaction object from context.
if tx := TXFromCtx(ctx, c.db.GetGroup()); tx != nil {
link = &txLink{tx.GetSqlTX()}
}

View File

@ -247,7 +247,9 @@ func (m *Model) doMappingAndFilterForInsertOrUpdateDataMap(data Map, allowOmitEm
// The parameter `master` specifies whether using the master node if master-slave configured.
func (m *Model) getLink(master bool) Link {
if m.tx != nil {
return &txLink{m.tx.GetSqlTX()}
if sqlTx := m.tx.GetSqlTX(); sqlTx != nil {
return &txLink{sqlTx}
}
}
linkType := m.linkType
if linkType == 0 {