diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index ec439ebcd..ab91f8aa2 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -15,14 +15,22 @@ const GroupExpireTime = time.Second * 60 * 60 * 12 const groupInfoCacheKey = "GROUP_INFO_CACHE:" type GroupCache struct { - Client *Client - db *mysql.Group - expireTime time.Duration + db *mysql.Group + expireTime time.Duration + redisClient *RedisClient + rcClient *rockscache.Client } -func NewGroupRc(rdb redis.UniversalClient, db *mysql.Group, opts rockscache.Options) GroupCache { - rcClient := newClient(rdb, opts) - return GroupCache{Client: rcClient, expireTime: GroupExpireTime} +func NewGroupCache(rdb redis.UniversalClient, db *mysql.Group, opts rockscache.Options) *GroupCache { + rcClient := &rockscache.Client{ + Options: opts, + } + redisClient := NewRedisClient(rdb) + return &GroupCache{rcClient: rcClient, expireTime: GroupExpireTime, db: db, redisClient: redisClient} +} + +func (g *GroupCache) getRedisClient() *RedisClient { + return g.redisClient } func (g *GroupCache) GetGroupsInfoFromCache(ctx context.Context, groupIDs []string) (groups []*mysql.Group, err error) { @@ -52,7 +60,7 @@ func (g *GroupCache) GetGroupInfoFromCache(ctx context.Context, groupID string) defer func() { trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "group", *group) }() - groupStr, err := g.Client.rcClient.Fetch(g.getGroupInfoCacheKey(groupID), g.expireTime, getGroup) + groupStr, err := g.rcClient.Fetch(g.getGroupInfoCacheKey(groupID), g.expireTime, getGroup) if err != nil { return nil, utils.Wrap(err, "") } @@ -64,7 +72,16 @@ func (g *GroupCache) DelGroupInfoFromCache(ctx context.Context, groupID string) defer func() { trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID) }() - return g.Client.rcClient.TagAsDeleted(g.getGroupInfoCacheKey(groupID)) + return g.rcClient.TagAsDeleted(g.getGroupInfoCacheKey(groupID)) +} + +func (g *GroupCache) DelGroupsInfoFromCache(ctx context.Context, groupIDs []string) error { + for _, groupID := range groupIDs { + if err := g.DelGroupInfoFromCache(ctx, groupID); err != nil { + return err + } + } + return nil } func (g *GroupCache) getGroupInfoCacheKey(groupID string) string { diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index 649bf5a3d..ddb568e23 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -42,9 +42,10 @@ const ( exTypeKeyLocker = "EX_LOCK:" ) -func InitRedis(ctx context.Context) go_redis.UniversalClient { +func InitRedis() go_redis.UniversalClient { var rdb go_redis.UniversalClient var err error + ctx := context.Background() if config.Config.Redis.EnableCluster { rdb = go_redis.NewClusterClient(&go_redis.ClusterOptions{ Addrs: config.Config.Redis.DBAddress, @@ -73,14 +74,14 @@ func InitRedis(ctx context.Context) go_redis.UniversalClient { return rdb } -func NewRedisClient(rdb go_redis.UniversalClient) *RedisClient { - return &RedisClient{rdb: rdb} -} - type RedisClient struct { rdb go_redis.UniversalClient } +func NewRedisClient(rdb go_redis.UniversalClient) *RedisClient { + return &RedisClient{rdb: rdb} +} + func (r *RedisClient) JudgeAccountEXISTS(account string) (bool, error) { key := accountTempCode + account n, err := r.rdb.Exists(context.Background(), key).Result() diff --git a/pkg/common/db/cache/rockscache.go b/pkg/common/db/cache/rockscache.go index 2e5babf1b..c17de7527 100644 --- a/pkg/common/db/cache/rockscache.go +++ b/pkg/common/db/cache/rockscache.go @@ -11,6 +11,7 @@ import ( "encoding/json" "github.com/dtm-labs/rockscache" "github.com/go-redis/redis/v8" + "gorm.io/gorm" "math/big" "sort" "strconv" @@ -18,11 +19,11 @@ import ( ) const ( - userInfoCache = "USER_INFO_CACHE:" - friendRelationCache = "FRIEND_RELATION_CACHE:" - blackListCache = "BLACK_LIST_CACHE:" - groupCache = "GROUP_CACHE:" - groupInfoCache = "GROUP_INFO_CACHE:" + userInfoCache = "USER_INFO_CACHE:" + friendRelationCache = "FRIEND_RELATION_CACHE:" + blackListCache = "BLACK_LIST_CACHE:" + groupCache = "GROUP_CACHE:" + //groupInfoCache = "GROUP_INFO_CACHE:" groupOwnerIDCache = "GROUP_OWNER_ID:" joinedGroupListCache = "JOINED_GROUP_LIST_CACHE:" groupMemberInfoCache = "GROUP_MEMBER_INFO_CACHE:" @@ -38,19 +39,10 @@ const ( ) const scanCount = 3000 - -type RcClient struct { - rdb redis.UniversalClient - Cache *rockscache.Client - ExpireTime time.Duration -} - -func NewRcClient(rdb redis.UniversalClient, expireTime time.Duration, opts rockscache.Options) *RcClient { - return &RcClient{Cache: rockscache.NewClient(rdb, opts), ExpireTime: expireTime} -} +const RandomExpireAdjustment = 0.2 func (rc *RcClient) DelKeys() { - for _, key := range []string{groupCache, friendRelationCache, blackListCache, userInfoCache, groupInfoCache, groupOwnerIDCache, joinedGroupListCache, + for _, key := range []string{groupCache, friendRelationCache, blackListCache, userInfoCache, groupInfoCacheKey, groupOwnerIDCache, joinedGroupListCache, groupMemberInfoCache, groupAllMemberInfoCache, allFriendInfoCache} { fName := utils.GetSelfFuncName() var cursor uint64 @@ -80,7 +72,7 @@ func (rc *RcClient) DelKeys() { } } -func (rc *RcClient) GetFriendIDListFromCache(ctx context.Context, userID string) (friendIDList []string, err error) { +func (rc *Client) GetFriendIDListFromCache(ctx context.Context, userID string) (friendIDList []string, err error) { getFriendIDList := func() (string, error) { friendIDList, err := mysql.GetFriendIDListByUserID(userID) if err != nil { @@ -364,36 +356,36 @@ func DelAllGroupMembersInfoFromCache(ctx context.Context, groupID string) (err e return db.DB.Rc.TagAsDeleted(groupAllMemberInfoCache + groupID) } -func GetGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *mysql.Group, err error) { - getGroupInfo := func() (string, error) { - groupInfo, err := mysql.GetGroupInfoByGroupID(groupID) - if err != nil { - return "", utils.Wrap(err, "") - } - bytes, err := json.Marshal(groupInfo) - if err != nil { - return "", utils.Wrap(err, "") - } - return string(bytes), nil - } - groupInfo = &mysql.Group{} - defer func() { - trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupInfo", groupInfo) - }() - groupInfoStr, err := db.DB.Rc.Fetch(groupInfoCache+groupID, time.Second*30*60, getGroupInfo) - if err != nil { - return nil, utils.Wrap(err, "") - } - err = json.Unmarshal([]byte(groupInfoStr), groupInfo) - return groupInfo, utils.Wrap(err, "") -} - -func DelGroupInfoFromCache(ctx context.Context, groupID string) (err error) { - defer func() { - trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID) - }() - return db.DB.Rc.TagAsDeleted(groupInfoCache + groupID) -} +//func GetGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *mysql.Group, err error) { +// getGroupInfo := func() (string, error) { +// groupInfo, err := mysql.GetGroupInfoByGroupID(groupID) +// if err != nil { +// return "", utils.Wrap(err, "") +// } +// bytes, err := json.Marshal(groupInfo) +// if err != nil { +// return "", utils.Wrap(err, "") +// } +// return string(bytes), nil +// } +// groupInfo = &mysql.Group{} +// defer func() { +// trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupInfo", groupInfo) +// }() +// groupInfoStr, err := db.DB.Rc.Fetch(groupInfoCache+groupID, time.Second*30*60, getGroupInfo) +// if err != nil { +// return nil, utils.Wrap(err, "") +// } +// err = json.Unmarshal([]byte(groupInfoStr), groupInfo) +// return groupInfo, utils.Wrap(err, "") +//} +// +//func DelGroupInfoFromCache(ctx context.Context, groupID string) (err error) { +// defer func() { +// trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID) +// }() +// return db.DB.Rc.TagAsDeleted(groupInfoCache + groupID) +//} func GetAllFriendsInfoFromCache(ctx context.Context, userID string) (friends []*mysql.Friend, err error) { getAllFriendInfo := func() (string, error) { diff --git a/pkg/common/db/model/group.go b/pkg/common/db/model/group.go index dd878ad98..c79113ef1 100644 --- a/pkg/common/db/model/group.go +++ b/pkg/common/db/model/group.go @@ -8,36 +8,30 @@ import ( "Open_IM/pkg/utils" "context" "encoding/json" - //"github.com/dtm-labs/rockscache" + "github.com/dtm-labs/rockscache" "gorm.io/gorm" //"time" ) type GroupModel struct { - db *mysql.Group - cache *cache.GroupCache - mongo *mongo.Client + db *mysql.Group + cache *cache.GroupCache + mongo *mongo.Client } - -func NewGroupModel() { +func NewGroupModel() *GroupModel { var groupModel GroupModel - redisClient := cache.InitRedis() - rdb := cache.NewRedisClient(redisClient) groupModel.db = mysql.NewGroupDB() - //mgo := mongo.In() + groupModel.cache = cache.NewGroupCache(cache.InitRedis(), groupModel.db, rockscache.Options{ + DisableCacheRead: false, + StrongConsistency: true, + }) + groupModel.mongo = mongo.NewMongoClient() + return &groupModel } func (g *GroupModel) Find(ctx context.Context, groupIDs []string) (groups []*mysql.Group, err error) { - g.cache.Client. - for _, groupID := range groupIDs { - group, err := g.getGroupInfoFromCache(ctx, groupID) - if err != nil { - return nil, err - } - groups = append(groups, group) - } - return groups, nil + return g.cache.GetGroupsInfoFromCache(ctx, groupIDs) } func (g *GroupModel) Create(ctx context.Context, groups []*mysql.Group) error { @@ -45,47 +39,9 @@ func (g *GroupModel) Create(ctx context.Context, groups []*mysql.Group) error { } func (g *GroupModel) Delete(ctx context.Context, groupIDs []string) error { - err := g.db.DB.Transaction(func(tx *gorm.DB) error { - if err := g.db.Delete(ctx, groupIDs, tx); err != nil { - return err - } - if err := g.deleteGroupsInCache(ctx, groupIDs); err != nil { - return err - } - return nil - }) - return err + return g.cache.DelGroupsInfoFromCache(ctx, groupIDs) } -func (g *GroupModel) deleteGroupsInCache(ctx context.Context, groupIDs []string) error { - for _, groupID := range groupIDs { - if err := g.weakRc.Cache.TagAsDeleted(g.getGroupCacheKey(groupID)); err != nil { - return err - } - } - return nil -} - -func (g *GroupModel) getGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *mysql.Group, err error) { - getGroupInfo := func() (string, error) { - groupInfo, err := mysql.GetGroupInfoByGroupID(groupID) - if err != nil { - return "", utils.Wrap(err, "") - } - bytes, err := json.Marshal(groupInfo) - if err != nil { - return "", utils.Wrap(err, "") - } - return string(bytes), nil - } - groupInfo = &mysql.Group{} - defer func() { - trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupInfo", groupInfo) - }() - groupInfoStr, err := g.weakRc.Cache.Fetch(groupInfoCache+groupID, GroupExpireTime, getGroupInfo) - if err != nil { - return nil, utils.Wrap(err, "") - } - err = json.Unmarshal([]byte(groupInfoStr), groupInfo) - return groupInfo, utils.Wrap(err, "") +func (g *GroupModel) Take(ctx context.Context, groupID string) (group *mysql.Group, err error) { + return g.cache.GetGroupInfoFromCache(ctx, groupID) } diff --git a/pkg/common/db/model/user.go b/pkg/common/db/model/user.go index 8a88b99cf..26ea4b2a3 100644 --- a/pkg/common/db/model/user.go +++ b/pkg/common/db/model/user.go @@ -9,9 +9,10 @@ type UserModel struct { db *mysql.User } -func NewGroupUser(ctx context.Context) { +func NewGroupUser(ctx context.Context) *UserModel { var userModel UserModel userModel.db = mysql.NewUserDB() + return &userModel } func (u *UserModel) Find(ctx context.Context, userIDs []string) (users []*mysql.User, err error) { diff --git a/pkg/common/db/mongo/init_mongo.go b/pkg/common/db/mongo/init_mongo.go index b61991506..3ffb0adb0 100644 --- a/pkg/common/db/mongo/init_mongo.go +++ b/pkg/common/db/mongo/init_mongo.go @@ -13,7 +13,17 @@ import ( "time" ) -func InitMongoClient() *mongo.Client { +type Client struct { + mongo *mongo.Client +} + +func NewMongoClient() *Client { + var client Client + client.mongo = initMongo() + return &client +} + +func initMongo() *mongo.Client { uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority" if config.Config.Mongo.DBUri != "" { // example: mongodb://$user:$password@mongo1.mongo:27017,mongo2.mongo:27017,mongo3.mongo:27017/$DBDatabase/?replicaSet=rs0&readPreference=secondary&authSource=admin&maxPoolSize=$DBMaxPoolSize diff --git a/pkg/common/db/mysql/group_model_k.go b/pkg/common/db/mysql/group_model_k.go index b6797109a..86bc49095 100644 --- a/pkg/common/db/mysql/group_model_k.go +++ b/pkg/common/db/mysql/group_model_k.go @@ -29,7 +29,8 @@ type Group struct { func NewGroupDB() *Group { var group Group - group.DB = initMysqlDB(&group) + db := ConnectToDB() + db = InitModel(db, &group) return &group } @@ -41,11 +42,11 @@ func (*Group) Create(ctx context.Context, groups []*Group) (err error) { return err } -func (*Group) Delete(ctx context.Context, groupIDs []string) (err error) { +func (g *Group) Delete(ctx context.Context, groupIDs []string, tx ...*gorm.DB) (err error) { defer func() { trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupIDs", groupIDs) }() - return utils.Wrap(GroupDB.Where("group_id in (?)", groupIDs).Delete(&Group{}).Error, "") + return utils.Wrap(getDBConn(g.DB, tx...).Where("group_id in (?)", groupIDs).Delete(&Group{}).Error, "") } func (*Group) UpdateByMap(ctx context.Context, groupID string, args map[string]interface{}) (err error) {