From 9a61a6970f6144cfe64a6f1d660b42a5a3d39168 Mon Sep 17 00:00:00 2001 From: John Guo Date: Mon, 17 Mar 2025 14:50:07 +0800 Subject: [PATCH] fix(database/gdb): fix transaction propagation feature (#4199) --- .../mysql/mysql_z_unit_transaction_test.go | 60 +++++++++++++++---- database/gdb/gdb_core_transaction.go | 42 ++++++------- database/gdb/gdb_core_txcore.go | 8 ++- database/gdb/gdb_core_underlying.go | 2 +- database/gdb/gdb_model_utility.go | 4 +- 5 files changed, 80 insertions(+), 36 deletions(-) diff --git a/contrib/drivers/mysql/mysql_z_unit_transaction_test.go b/contrib/drivers/mysql/mysql_z_unit_transaction_test.go index e9c3315b4..0c583d964 100644 --- a/contrib/drivers/mysql/mysql_z_unit_transaction_test.go +++ b/contrib/drivers/mysql/mysql_z_unit_transaction_test.go @@ -1286,8 +1286,7 @@ func Test_Transaction_Propagation(t *testing.T) { Propagation: gdb.PropagationNotSupported, }, func(ctx context.Context, tx2 gdb.TX) error { // Should execute without transaction - t.Assert(tx2, nil) - _, err := db.Insert(ctx, table, g.Map{ + _, err = db.Insert(ctx, table, g.Map{ "id": 9, "passport": "non_tx_record", }) @@ -1346,8 +1345,6 @@ func Test_Transaction_Propagation(t *testing.T) { err := db.TransactionWithOptions(ctx, gdb.TxOptions{ Propagation: gdb.PropagationNever, }, func(ctx context.Context, tx gdb.TX) error { - // Should execute without transaction - t.Assert(tx, nil) _, err := db.Insert(ctx, table, g.Map{ "id": 11, "passport": "never", @@ -1369,6 +1366,51 @@ func Test_Transaction_Propagation(t *testing.T) { }) } +func Test_Transaction_Propagation_PropagationSupports(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + table := createTable() + defer dropTable(table) + + // scenario1: when in a transaction, use PropagationSupports to execute a transaction + err := db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { + // insert in outer tx. + _, err := tx.Insert(table, g.Map{ + "id": 1, + }) + if err != nil { + return err + } + err = tx.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationSupports, + }, func(ctx context.Context, tx2 gdb.TX) error { + _, err = tx2.Insert(table, g.Map{ + "id": 2, + }) + return gerror.New("error") + }) + return err + }) + t.AssertNE(err, nil) + + // scenario2: when not in a transaction, do not use transaction but direct db link. + err = db.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationSupports, + }, func(ctx context.Context, tx gdb.TX) error { + _, err = tx.Insert(table, g.Map{ + "id": 3, + }) + return err + }) + t.AssertNil(err) + + // 查询结果 + result, err := db.Model(table).OrderAsc("id").All() + t.AssertNil(err) + t.Assert(len(result), 1) + t.Assert(result[0]["id"], 3) + }) +} + func Test_Transaction_Propagation_Complex(t *testing.T) { gtest.C(t, func(t *gtest.T) { table1 := createTable() @@ -1389,7 +1431,7 @@ func Test_Transaction_Propagation_Complex(t *testing.T) { err = tx1.TransactionWithOptions(ctx, gdb.TxOptions{ Propagation: gdb.PropagationNested, }, func(ctx context.Context, tx2 gdb.TX) error { - _, err := tx2.Insert(table1, g.Map{ + _, err = tx2.Insert(table1, g.Map{ "id": 2, "passport": "nested1", }) @@ -1427,10 +1469,7 @@ func Test_Transaction_Propagation_Complex(t *testing.T) { err = tx1.TransactionWithOptions(ctx, gdb.TxOptions{ Propagation: gdb.PropagationNotSupported, }, func(ctx context.Context, tx2 gdb.TX) error { - // Should execute without transaction - t.Assert(tx2, nil) - - _, err := db.Insert(ctx, table2, g.Map{ + _, err = db.Insert(ctx, table2, g.Map{ "id": 5, "passport": "not_supported", }) @@ -1489,9 +1528,6 @@ func Test_Transaction_Propagation_Complex(t *testing.T) { err = tx1.TransactionWithOptions(ctx, gdb.TxOptions{ Propagation: gdb.PropagationNotSupported, }, func(ctx context.Context, tx2 gdb.TX) error { - // Should execute without transaction - t.Assert(tx2, nil) - // Start a new independent transaction return db.Transaction(ctx, func(ctx context.Context, tx3 gdb.TX) error { _, err := tx3.Insert(table, g.Map{ diff --git a/database/gdb/gdb_core_transaction.go b/database/gdb/gdb_core_transaction.go index 1faa73a62..c4cbfa616 100644 --- a/database/gdb/gdb_core_transaction.go +++ b/database/gdb/gdb_core_transaction.go @@ -19,9 +19,15 @@ import ( type Propagation string const ( + // PropagationNested starts a nested transaction if already in a transaction, + // or behaves like PropagationRequired if not in a transaction. + // + // It is the default behavior. + PropagationNested Propagation = "NESTED" + // PropagationRequired starts a new transaction if not in a transaction, // or uses the existing transaction if already in a transaction. - PropagationRequired Propagation = "" // REQUIRED + PropagationRequired Propagation = "REQUIRED" // PropagationSupports executes within the existing transaction if present, // otherwise executes without transaction. @@ -30,10 +36,6 @@ const ( // PropagationRequiresNew starts a new transaction, and suspends the current transaction if one exists. PropagationRequiresNew Propagation = "REQUIRES_NEW" - // PropagationNested starts a nested transaction if already in a transaction, - // or behaves like PropagationRequired if not in a transaction. - PropagationNested Propagation = "NESTED" - // PropagationNotSupported executes non-transactional, suspends any existing transaction. PropagationNotSupported Propagation = "NOT_SUPPORTED" @@ -66,7 +68,8 @@ var transactionIdGenerator = gtype.NewUint64() // DefaultTxOptions returns the default transaction options. func DefaultTxOptions() TxOptions { return TxOptions{ - Propagation: PropagationRequired, + // Note the default propagation type is PropagationNested not PropagationRequired. + Propagation: PropagationNested, } } @@ -138,11 +141,14 @@ func (c *Core) TransactionWithOptions( switch opts.Propagation { case PropagationRequired: if currentTx != nil { - return currentTx.Transaction(ctx, f) + return f(ctx, currentTx) } return c.createNewTransaction(ctx, opts, f) case PropagationSupports: + if currentTx == nil { + currentTx = c.newEmptyTX() + } return f(ctx, currentTx) case PropagationMandatory: @@ -160,7 +166,7 @@ func (c *Core) TransactionWithOptions( case PropagationNotSupported: ctx = WithoutTX(ctx, group) - return f(ctx, nil) + return f(ctx, c.newEmptyTX()) case PropagationNever: if currentTx != nil { @@ -169,22 +175,12 @@ func (c *Core) TransactionWithOptions( "transaction propagation NEVER cannot run within an existing transaction", ) } - return f(ctx, nil) + ctx = WithoutTX(ctx, group) + return f(ctx, c.newEmptyTX()) case PropagationNested: if currentTx != nil { - // Create savepoint for nested transaction - if err = currentTx.Begin(); err != nil { - return err - } - defer func() { - if err != nil { - if rbErr := currentTx.Rollback(); rbErr != nil { - err = gerror.Wrap(err, rbErr.Error()) - } - } - }() - return f(ctx, currentTx) + return currentTx.Transaction(ctx, f) } return c.createNewTransaction(ctx, opts, f) @@ -280,6 +276,10 @@ func TXFromCtx(ctx context.Context, group string) TX { if tx.IsClosed() { return nil } + // no underlying sql tx. + if tx.GetSqlTX() == nil { + return nil + } tx = tx.Ctx(ctx) return tx } diff --git a/database/gdb/gdb_core_txcore.go b/database/gdb/gdb_core_txcore.go index 79154068f..492d13380 100644 --- a/database/gdb/gdb_core_txcore.go +++ b/database/gdb/gdb_core_txcore.go @@ -46,6 +46,12 @@ type TXCore struct { cancelFunc context.CancelFunc } +func (c *Core) newEmptyTX() TX { + return &TXCore{ + db: c.db, + } +} + // transactionKeyForNestedPoint forms and returns the transaction key at current save point. func (tx *TXCore) transactionKeyForNestedPoint() string { return tx.db.GetCore().QuoteWord( @@ -427,5 +433,5 @@ func (tx *TXCore) IsOnMaster() bool { // IsTransaction implements interface function Link.IsTransaction. func (tx *TXCore) IsTransaction() bool { - return true + return tx != nil } diff --git a/database/gdb/gdb_core_underlying.go b/database/gdb/gdb_core_underlying.go index d7d8d1c51..481e870e8 100644 --- a/database/gdb/gdb_core_underlying.go +++ b/database/gdb/gdb_core_underlying.go @@ -103,7 +103,7 @@ func (c *Core) DoExec(ctx context.Context, link Link, sql string, args ...interf return nil, err } } else if !link.IsTransaction() { - // If current link is not transaction link, it checks and retrieves transaction from context. + // If current link is not transaction link, it tries retrieving transaction object from context. if tx := TXFromCtx(ctx, c.db.GetGroup()); tx != nil { link = &txLink{tx.GetSqlTX()} } diff --git a/database/gdb/gdb_model_utility.go b/database/gdb/gdb_model_utility.go index 353239623..b9782f7fa 100644 --- a/database/gdb/gdb_model_utility.go +++ b/database/gdb/gdb_model_utility.go @@ -247,7 +247,9 @@ func (m *Model) doMappingAndFilterForInsertOrUpdateDataMap(data Map, allowOmitEm // The parameter `master` specifies whether using the master node if master-slave configured. func (m *Model) getLink(master bool) Link { if m.tx != nil { - return &txLink{m.tx.GetSqlTX()} + if sqlTx := m.tx.GetSqlTX(); sqlTx != nil { + return &txLink{sqlTx} + } } linkType := m.linkType if linkType == 0 {