diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index 06c3256fc..3955d21bf 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -22,11 +22,8 @@ type GroupCache struct { } func NewGroupCache(rdb redis.UniversalClient, db *mysql.Group, opts rockscache.Options) *GroupCache { - rcClient := &rockscache.Client{ - Options: rockscache.Options{}, - } redisClient := NewRedisClient(rdb) - return &GroupCache{rcClient: rcClient, expireTime: GroupExpireTime, db: db, redisClient: redisClient} + return &GroupCache{rcClient: rockscache.NewClient(rdb, opts), expireTime: GroupExpireTime, db: db, redisClient: redisClient} } func (g *GroupCache) getRedisClient() *RedisClient { diff --git a/pkg/common/db/model/group.go b/pkg/common/db/model/group.go index 3cdcc7d97..693d5225a 100644 --- a/pkg/common/db/model/group.go +++ b/pkg/common/db/model/group.go @@ -26,6 +26,8 @@ func NewGroupModel(db *mysql.Group, rdb redis.UniversalClient, mdb *mongo.Client DisableCacheRead: false, StrongConsistency: true, }) + sg := mdb.Database().Collection() + sg.Find() groupModel.mongo = mongoDB.NewMongoClient(mdb) return &groupModel } @@ -54,3 +56,20 @@ func (g *GroupModel) Delete(ctx context.Context, groupIDs []string) error { func (g *GroupModel) Take(ctx context.Context, groupID string) (group *mysql.Group, err error) { return g.cache.GetGroupInfoFromCache(ctx, groupID) } + +func (g *GroupModel) Update(ctx context.Context, groups []*mysql.Group) error { + err := g.db.DB.Transaction(func(tx *gorm.DB) error { + if err := g.db.Update(ctx, groups, tx); err != nil { + return err + } + var groupIDs []string + for _, group := range groups { + groupIDs = append(groupIDs, group.GroupID) + } + if err := g.cache.DelGroupsInfoFromCache(ctx, groupIDs); err != nil { + return err + } + return nil + }) + return err +} diff --git a/pkg/common/db/mongoDB/super_group.go b/pkg/common/db/mongoDB/super_group.go index a28289b80..980e39490 100644 --- a/pkg/common/db/mongoDB/super_group.go +++ b/pkg/common/db/mongoDB/super_group.go @@ -1,5 +1,11 @@ package mongoDB +import ( + "Open_IM/pkg/utils" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + type SuperGroup struct { GroupID string `bson:"group_id" json:"groupID"` MemberIDList []string `bson:"member_id_list" json:"memberIDList"` @@ -9,3 +15,167 @@ type UserToSuperGroup struct { UserID string `bson:"user_id" json:"userID"` GroupIDList []string `bson:"group_id_list" json:"groupIDList"` } + +func New + +func (d *db.DataBases) CreateSuperGroup(groupID string, initMemberIDList []string, memberNumCount int) error { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) + session, err := d.mongoClient.StartSession() + if err != nil { + return utils.Wrap(err, "start session failed") + } + defer session.EndSession(ctx) + sCtx := mongo.NewSessionContext(ctx, session) + superGroup := SuperGroup{ + GroupID: groupID, + MemberIDList: initMemberIDList, + } + _, err = c.InsertOne(sCtx, superGroup) + if err != nil { + _ = session.AbortTransaction(ctx) + return utils.Wrap(err, "transaction failed") + } + var users []UserToSuperGroup + for _, v := range initMemberIDList { + users = append(users, UserToSuperGroup{ + UserID: v, + }) + } + upsert := true + opts := &options.UpdateOptions{ + Upsert: &upsert, + } + c = d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup) + //_, err = c.UpdateMany(sCtx, bson.M{"user_id": bson.M{"$in": initMemberIDList}}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts) + //if err != nil { + // session.AbortTransaction(ctx) + // return utils.Wrap(err, "transaction failed") + //} + for _, userID := range initMemberIDList { + _, err = c.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts) + if err != nil { + _ = session.AbortTransaction(ctx) + return utils.Wrap(err, "transaction failed") + } + + } + return err +} + +func (d *db.DataBases) GetSuperGroup(groupID string) (SuperGroup, error) { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) + superGroup := SuperGroup{} + err := c.FindOne(ctx, bson.M{"group_id": groupID}).Decode(&superGroup) + return superGroup, err +} + +func (d *db.DataBases) AddUserToSuperGroup(groupID string, userIDList []string) error { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) + session, err := d.mongoClient.StartSession() + if err != nil { + return utils.Wrap(err, "start session failed") + } + defer session.EndSession(ctx) + sCtx := mongo.NewSessionContext(ctx, session) + if err != nil { + return utils.Wrap(err, "start transaction failed") + } + _, err = c.UpdateOne(sCtx, bson.M{"group_id": groupID}, bson.M{"$addToSet": bson.M{"member_id_list": bson.M{"$each": userIDList}}}) + if err != nil { + _ = session.AbortTransaction(ctx) + return utils.Wrap(err, "transaction failed") + } + c = d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup) + var users []UserToSuperGroup + for _, v := range userIDList { + users = append(users, UserToSuperGroup{ + UserID: v, + }) + } + upsert := true + opts := &options.UpdateOptions{ + Upsert: &upsert, + } + for _, userID := range userIDList { + _, err = c.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts) + if err != nil { + _ = session.AbortTransaction(ctx) + return utils.Wrap(err, "transaction failed") + } + } + _ = session.CommitTransaction(ctx) + return err +} + +func (d *db.DataBases) RemoverUserFromSuperGroup(groupID string, userIDList []string) error { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) + session, err := d.mongoClient.StartSession() + if err != nil { + return utils.Wrap(err, "start session failed") + } + defer session.EndSession(ctx) + sCtx := mongo.NewSessionContext(ctx, session) + _, err = c.UpdateOne(ctx, bson.M{"group_id": groupID}, bson.M{"$pull": bson.M{"member_id_list": bson.M{"$in": userIDList}}}) + if err != nil { + _ = session.AbortTransaction(ctx) + return utils.Wrap(err, "transaction failed") + } + err = d.RemoveGroupFromUser(ctx, sCtx, groupID, userIDList) + if err != nil { + _ = session.AbortTransaction(ctx) + return utils.Wrap(err, "transaction failed") + } + _ = session.CommitTransaction(ctx) + return err +} + +func (d *db.DataBases) GetSuperGroupByUserID(userID string) (UserToSuperGroup, error) { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup) + var user UserToSuperGroup + _ = c.FindOne(ctx, bson.M{"user_id": userID}).Decode(&user) + return user, nil +} + +func (d *db.DataBases) DeleteSuperGroup(groupID string) error { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) + session, err := d.mongoClient.StartSession() + if err != nil { + return utils.Wrap(err, "start session failed") + } + defer session.EndSession(ctx) + sCtx := mongo.NewSessionContext(ctx, session) + superGroup := &SuperGroup{} + result := c.FindOneAndDelete(sCtx, bson.M{"group_id": groupID}) + err = result.Decode(superGroup) + if err != nil { + session.AbortTransaction(ctx) + return utils.Wrap(err, "transaction failed") + } + if err = d.RemoveGroupFromUser(ctx, sCtx, groupID, superGroup.MemberIDList); err != nil { + session.AbortTransaction(ctx) + return utils.Wrap(err, "transaction failed") + } + session.CommitTransaction(ctx) + return nil +} + +func (d *db.DataBases) RemoveGroupFromUser(ctx, sCtx context.Context, groupID string, userIDList []string) error { + var users []UserToSuperGroup + for _, v := range userIDList { + users = append(users, UserToSuperGroup{ + UserID: v, + }) + } + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup) + _, err := c.UpdateOne(sCtx, bson.M{"user_id": bson.M{"$in": userIDList}}, bson.M{"$pull": bson.M{"group_id_list": groupID}}) + if err != nil { + return utils.Wrap(err, "UpdateOne transaction failed") + } + return err +} \ No newline at end of file diff --git a/pkg/common/db/mysql/group_model_k.go b/pkg/common/db/mysql/group_model_k.go index 86bc49095..da05a1ee6 100644 --- a/pkg/common/db/mysql/group_model_k.go +++ b/pkg/common/db/mysql/group_model_k.go @@ -56,11 +56,11 @@ func (*Group) UpdateByMap(ctx context.Context, groupID string, args map[string]i return utils.Wrap(GroupDB.Where("group_id = ?", groupID).Updates(args).Error, "") } -func (*Group) Update(ctx context.Context, groups []*Group) (err error) { +func (g *Group) Update(ctx context.Context, groups []*Group, tx ...*gorm.DB) (err error) { defer func() { trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groups", groups) }() - return utils.Wrap(GroupDB.Updates(&groups).Error, "") + return utils.Wrap(getDBConn(g.DB, tx...).Updates(&groups).Error, "") } func (*Group) Find(ctx context.Context, groupIDs []string) (groups []*Group, err error) {