From 945449108d92d32ea3acf97cd449c6a0cefe8108 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Sun, 29 Jan 2023 11:15:16 +0800 Subject: [PATCH] errcode --- pkg/common/db/controller/group.go | 26 ++++++++-- pkg/common/db/unrelation/init_mongo.go | 4 +- pkg/common/db/unrelation/super_group.go | 67 ++++++++++--------------- 3 files changed, 51 insertions(+), 46 deletions(-) diff --git a/pkg/common/db/controller/group.go b/pkg/common/db/controller/group.go index 2bf0106b1..f2d8f53a4 100644 --- a/pkg/common/db/controller/group.go +++ b/pkg/common/db/controller/group.go @@ -27,7 +27,7 @@ type GroupController struct { database DataBase } -func NewGroupController(db *gorm.DB, rdb redis.UniversalClient, mgoDB *mongo.Database) GroupInterface { +func NewGroupController(db *gorm.DB, rdb redis.UniversalClient, mgoDB *mongo.Client) GroupInterface { groupController := &GroupController{database: newGroupDatabase(db, rdb, mgoDB)} return groupController } @@ -69,19 +69,22 @@ type GroupDataBase struct { groupDB *relation.Group groupMemberDB *relation.GroupMember groupRequestDB *relation.GroupRequest + db *gorm.DB cache *cache.GroupCache mongoDB *unrelation.SuperGroupMgoDB } -func newGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, mgoDB *mongo.Database) DataBase { +func newGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, mgoDB *mongo.Client) DataBase { groupDB := relation.NewGroupDB(db) groupMemberDB := relation.NewGroupMemberDB(db) groupRequestDB := relation.NewGroupRequest(db) + newDB := db database := &GroupDataBase{ groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB, + db: newDB, cache: cache.NewGroupCache(rdb, groupDB, groupMemberDB, groupRequestDB, rockscache.Options{ RandomExpireAdjustment: 0.2, DisableCacheRead: false, @@ -118,7 +121,7 @@ func (g *GroupDataBase) TakeGroupByID(ctx context.Context, groupID string) (grou } func (g *GroupDataBase) Update(ctx context.Context, groups []*relation.Group) error { - return g.groupDB.DB.Transaction(func(tx *gorm.DB) error { + return g.db.Transaction(func(tx *gorm.DB) error { if err := g.groupDB.Update(ctx, groups, tx); err != nil { return err } @@ -134,7 +137,22 @@ func (g *GroupDataBase) Update(ctx context.Context, groups []*relation.Group) er } func (g *GroupDataBase) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string, memberNumCount int) error { - return g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDList, memberNumCount, g.cache.DelJoinedSuperGroupIDs) + sess, err := g.mongoDB.MgoClient.StartSession() + if err != nil { + return err + } + defer sess.EndSession(ctx) + sCtx := mongo.NewSessionContext(ctx, sess) + if err = g.mongoDB.CreateSuperGroup(sCtx, groupID, initMemberIDList, memberNumCount); err != nil { + _ = sess.AbortTransaction(ctx) + return err + } + + if err = g.cache.DelJoinedSuperGroupIDs(ctx, initMemberIDList); err != nil { + _ = sess.AbortTransaction(ctx) + return err + } + return sess.CommitTransaction(ctx) } func (g *GroupDataBase) GetSuperGroupByID(ctx context.Context, groupID string) (superGroup *unrelation.SuperGroup, err error) { diff --git a/pkg/common/db/unrelation/init_mongo.go b/pkg/common/db/unrelation/init_mongo.go index efd75c623..d5c1c9ef1 100644 --- a/pkg/common/db/unrelation/init_mongo.go +++ b/pkg/common/db/unrelation/init_mongo.go @@ -14,7 +14,7 @@ import ( ) type Mongo struct { - DB *mongo.Database + DB *mongo.Client } func (m *Mongo) InitMongo() { @@ -54,7 +54,7 @@ func (m *Mongo) InitMongo() { panic(err.Error() + " mongo.Connect failed " + uri) } } - m.DB = mongoClient.Database(config.Config.Mongo.DBDatabase) + m.DB = mongoClient } func (m *Mongo) CreateTagIndex() { diff --git a/pkg/common/db/unrelation/super_group.go b/pkg/common/db/unrelation/super_group.go index 326aba1b6..5358f7b4c 100644 --- a/pkg/common/db/unrelation/super_group.go +++ b/pkg/common/db/unrelation/super_group.go @@ -1,6 +1,7 @@ package unrelation import ( + "Open_IM/pkg/common/config" "Open_IM/pkg/utils" "context" "go.mongodb.org/mongo-driver/bson" @@ -15,7 +16,8 @@ const ( ) type SuperGroupMgoDB struct { - mgoDB *mongo.Database + MgoClient *mongo.Client + MgoDB *mongo.Database superGroupCollection *mongo.Collection userToSuperGroupCollection *mongo.Collection } @@ -30,47 +32,32 @@ type UserToSuperGroup struct { GroupIDList []string `bson:"group_id_list" json:"groupIDList"` } -func NewSuperGroupMgoDB(mgoDB *mongo.Database) *SuperGroupMgoDB { - return &SuperGroupMgoDB{mgoDB: mgoDB, superGroupCollection: mgoDB.Collection(cSuperGroup), userToSuperGroupCollection: mgoDB.Collection(cUserToSuperGroup)} +func NewSuperGroupMgoDB(mgoClient *mongo.Client) *SuperGroupMgoDB { + mgoDB := mgoClient.Database(config.Config.Mongo.DBDatabase) + return &SuperGroupMgoDB{MgoDB: mgoDB, MgoClient: mgoClient, superGroupCollection: mgoDB.Collection(cSuperGroup), userToSuperGroupCollection: mgoDB.Collection(cUserToSuperGroup)} } -func (db *SuperGroupMgoDB) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string, memberNumCount int, cacheFunc func(ctx context.Context, userIDs []string) error) error { - //ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) - //c := db.mgoDB.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) - opts := options.Session().SetDefaultReadConcern(readconcern.Majority()) - return db.mgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error { - err := sCtx.StartTransaction() +func (db *SuperGroupMgoDB) CreateSuperGroup(sCtx mongo.SessionContext, groupID string, initMemberIDList []string, memberNumCount int) error { + superGroup := SuperGroup{ + GroupID: groupID, + MemberIDList: initMemberIDList, + } + _, err := db.superGroupCollection.InsertOne(sCtx, superGroup) + if err != nil { + return err + } + upsert := true + opts := &options.UpdateOptions{ + Upsert: &upsert, + } + for _, userID := range initMemberIDList { + _, err = db.userToSuperGroupCollection.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts) if err != nil { return err } - superGroup := SuperGroup{ - GroupID: groupID, - MemberIDList: initMemberIDList, - } - _, err = db.superGroupCollection.InsertOne(sCtx, superGroup) - if err != nil { - _ = sCtx.AbortTransaction(ctx) - return err - } - upsert := true - opts := &options.UpdateOptions{ - Upsert: &upsert, - } - for _, userID := range initMemberIDList { - _, err = db.userToSuperGroupCollection.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts) - if err != nil { - _ = sCtx.AbortTransaction(ctx) - return err - } - } - if cacheFunc != nil { - if err = cacheFunc(ctx, initMemberIDList); err != nil { - _ = sCtx.AbortTransaction(ctx) - return err - } - } - return sCtx.CommitTransaction(ctx) - }) + } + return nil + } func (db *SuperGroupMgoDB) GetSuperGroup(ctx context.Context, groupID string) (*SuperGroup, error) { @@ -81,7 +68,7 @@ func (db *SuperGroupMgoDB) GetSuperGroup(ctx context.Context, groupID string) (* func (db *SuperGroupMgoDB) AddUserToSuperGroup(ctx context.Context, groupID string, userIDList []string) error { opts := options.Session().SetDefaultReadConcern(readconcern.Majority()) - return db.mgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error { + return db.MgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error { _, err := db.superGroupCollection.UpdateOne(sCtx, bson.M{"group_id": groupID}, bson.M{"$addToSet": bson.M{"member_id_list": bson.M{"$each": userIDList}}}) if err != nil { _ = sCtx.AbortTransaction(ctx) @@ -104,7 +91,7 @@ func (db *SuperGroupMgoDB) AddUserToSuperGroup(ctx context.Context, groupID stri func (db *SuperGroupMgoDB) RemoverUserFromSuperGroup(ctx context.Context, groupID string, userIDList []string) error { opts := options.Session().SetDefaultReadConcern(readconcern.Majority()) - return db.mgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error { + return db.MgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error { _, err := db.superGroupCollection.UpdateOne(sCtx, bson.M{"group_id": groupID}, bson.M{"$pull": bson.M{"member_id_list": bson.M{"$in": userIDList}}}) if err != nil { _ = sCtx.AbortTransaction(ctx) @@ -127,7 +114,7 @@ func (db *SuperGroupMgoDB) GetSuperGroupByUserID(ctx context.Context, userID str func (db *SuperGroupMgoDB) DeleteSuperGroup(ctx context.Context, groupID string) error { opts := options.Session().SetDefaultReadConcern(readconcern.Majority()) - return db.mgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error { + return db.MgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error { superGroup := &SuperGroup{} _, err := db.superGroupCollection.DeleteOne(sCtx, bson.M{"group_id": groupID}) if err != nil {