This commit is contained in:
wangchuxiao 2023-01-17 16:36:34 +08:00
parent 1d428785d1
commit 3751e633e4
7 changed files with 102 additions and 124 deletions

View File

@ -15,14 +15,22 @@ const GroupExpireTime = time.Second * 60 * 60 * 12
const groupInfoCacheKey = "GROUP_INFO_CACHE:" const groupInfoCacheKey = "GROUP_INFO_CACHE:"
type GroupCache struct { type GroupCache struct {
Client *Client db *mysql.Group
db *mysql.Group expireTime time.Duration
expireTime time.Duration redisClient *RedisClient
rcClient *rockscache.Client
} }
func NewGroupRc(rdb redis.UniversalClient, db *mysql.Group, opts rockscache.Options) GroupCache { func NewGroupCache(rdb redis.UniversalClient, db *mysql.Group, opts rockscache.Options) *GroupCache {
rcClient := newClient(rdb, opts) rcClient := &rockscache.Client{
return GroupCache{Client: rcClient, expireTime: GroupExpireTime} Options: opts,
}
redisClient := NewRedisClient(rdb)
return &GroupCache{rcClient: rcClient, expireTime: GroupExpireTime, db: db, redisClient: redisClient}
}
func (g *GroupCache) getRedisClient() *RedisClient {
return g.redisClient
} }
func (g *GroupCache) GetGroupsInfoFromCache(ctx context.Context, groupIDs []string) (groups []*mysql.Group, err error) { func (g *GroupCache) GetGroupsInfoFromCache(ctx context.Context, groupIDs []string) (groups []*mysql.Group, err error) {
@ -52,7 +60,7 @@ func (g *GroupCache) GetGroupInfoFromCache(ctx context.Context, groupID string)
defer func() { defer func() {
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "group", *group) trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "group", *group)
}() }()
groupStr, err := g.Client.rcClient.Fetch(g.getGroupInfoCacheKey(groupID), g.expireTime, getGroup) groupStr, err := g.rcClient.Fetch(g.getGroupInfoCacheKey(groupID), g.expireTime, getGroup)
if err != nil { if err != nil {
return nil, utils.Wrap(err, "") return nil, utils.Wrap(err, "")
} }
@ -64,7 +72,16 @@ func (g *GroupCache) DelGroupInfoFromCache(ctx context.Context, groupID string)
defer func() { defer func() {
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID) trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
}() }()
return g.Client.rcClient.TagAsDeleted(g.getGroupInfoCacheKey(groupID)) return g.rcClient.TagAsDeleted(g.getGroupInfoCacheKey(groupID))
}
func (g *GroupCache) DelGroupsInfoFromCache(ctx context.Context, groupIDs []string) error {
for _, groupID := range groupIDs {
if err := g.DelGroupInfoFromCache(ctx, groupID); err != nil {
return err
}
}
return nil
} }
func (g *GroupCache) getGroupInfoCacheKey(groupID string) string { func (g *GroupCache) getGroupInfoCacheKey(groupID string) string {

View File

@ -42,9 +42,10 @@ const (
exTypeKeyLocker = "EX_LOCK:" exTypeKeyLocker = "EX_LOCK:"
) )
func InitRedis(ctx context.Context) go_redis.UniversalClient { func InitRedis() go_redis.UniversalClient {
var rdb go_redis.UniversalClient var rdb go_redis.UniversalClient
var err error var err error
ctx := context.Background()
if config.Config.Redis.EnableCluster { if config.Config.Redis.EnableCluster {
rdb = go_redis.NewClusterClient(&go_redis.ClusterOptions{ rdb = go_redis.NewClusterClient(&go_redis.ClusterOptions{
Addrs: config.Config.Redis.DBAddress, Addrs: config.Config.Redis.DBAddress,
@ -73,14 +74,14 @@ func InitRedis(ctx context.Context) go_redis.UniversalClient {
return rdb return rdb
} }
func NewRedisClient(rdb go_redis.UniversalClient) *RedisClient {
return &RedisClient{rdb: rdb}
}
type RedisClient struct { type RedisClient struct {
rdb go_redis.UniversalClient rdb go_redis.UniversalClient
} }
func NewRedisClient(rdb go_redis.UniversalClient) *RedisClient {
return &RedisClient{rdb: rdb}
}
func (r *RedisClient) JudgeAccountEXISTS(account string) (bool, error) { func (r *RedisClient) JudgeAccountEXISTS(account string) (bool, error) {
key := accountTempCode + account key := accountTempCode + account
n, err := r.rdb.Exists(context.Background(), key).Result() n, err := r.rdb.Exists(context.Background(), key).Result()

View File

@ -11,6 +11,7 @@ import (
"encoding/json" "encoding/json"
"github.com/dtm-labs/rockscache" "github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"gorm.io/gorm"
"math/big" "math/big"
"sort" "sort"
"strconv" "strconv"
@ -18,11 +19,11 @@ import (
) )
const ( const (
userInfoCache = "USER_INFO_CACHE:" userInfoCache = "USER_INFO_CACHE:"
friendRelationCache = "FRIEND_RELATION_CACHE:" friendRelationCache = "FRIEND_RELATION_CACHE:"
blackListCache = "BLACK_LIST_CACHE:" blackListCache = "BLACK_LIST_CACHE:"
groupCache = "GROUP_CACHE:" groupCache = "GROUP_CACHE:"
groupInfoCache = "GROUP_INFO_CACHE:" //groupInfoCache = "GROUP_INFO_CACHE:"
groupOwnerIDCache = "GROUP_OWNER_ID:" groupOwnerIDCache = "GROUP_OWNER_ID:"
joinedGroupListCache = "JOINED_GROUP_LIST_CACHE:" joinedGroupListCache = "JOINED_GROUP_LIST_CACHE:"
groupMemberInfoCache = "GROUP_MEMBER_INFO_CACHE:" groupMemberInfoCache = "GROUP_MEMBER_INFO_CACHE:"
@ -38,19 +39,10 @@ const (
) )
const scanCount = 3000 const scanCount = 3000
const RandomExpireAdjustment = 0.2
type RcClient struct {
rdb redis.UniversalClient
Cache *rockscache.Client
ExpireTime time.Duration
}
func NewRcClient(rdb redis.UniversalClient, expireTime time.Duration, opts rockscache.Options) *RcClient {
return &RcClient{Cache: rockscache.NewClient(rdb, opts), ExpireTime: expireTime}
}
func (rc *RcClient) DelKeys() { func (rc *RcClient) DelKeys() {
for _, key := range []string{groupCache, friendRelationCache, blackListCache, userInfoCache, groupInfoCache, groupOwnerIDCache, joinedGroupListCache, for _, key := range []string{groupCache, friendRelationCache, blackListCache, userInfoCache, groupInfoCacheKey, groupOwnerIDCache, joinedGroupListCache,
groupMemberInfoCache, groupAllMemberInfoCache, allFriendInfoCache} { groupMemberInfoCache, groupAllMemberInfoCache, allFriendInfoCache} {
fName := utils.GetSelfFuncName() fName := utils.GetSelfFuncName()
var cursor uint64 var cursor uint64
@ -80,7 +72,7 @@ func (rc *RcClient) DelKeys() {
} }
} }
func (rc *RcClient) GetFriendIDListFromCache(ctx context.Context, userID string) (friendIDList []string, err error) { func (rc *Client) GetFriendIDListFromCache(ctx context.Context, userID string) (friendIDList []string, err error) {
getFriendIDList := func() (string, error) { getFriendIDList := func() (string, error) {
friendIDList, err := mysql.GetFriendIDListByUserID(userID) friendIDList, err := mysql.GetFriendIDListByUserID(userID)
if err != nil { if err != nil {
@ -364,36 +356,36 @@ func DelAllGroupMembersInfoFromCache(ctx context.Context, groupID string) (err e
return db.DB.Rc.TagAsDeleted(groupAllMemberInfoCache + groupID) return db.DB.Rc.TagAsDeleted(groupAllMemberInfoCache + groupID)
} }
func GetGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *mysql.Group, err error) { //func GetGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *mysql.Group, err error) {
getGroupInfo := func() (string, error) { // getGroupInfo := func() (string, error) {
groupInfo, err := mysql.GetGroupInfoByGroupID(groupID) // groupInfo, err := mysql.GetGroupInfoByGroupID(groupID)
if err != nil { // if err != nil {
return "", utils.Wrap(err, "") // return "", utils.Wrap(err, "")
} // }
bytes, err := json.Marshal(groupInfo) // bytes, err := json.Marshal(groupInfo)
if err != nil { // if err != nil {
return "", utils.Wrap(err, "") // return "", utils.Wrap(err, "")
} // }
return string(bytes), nil // return string(bytes), nil
} // }
groupInfo = &mysql.Group{} // groupInfo = &mysql.Group{}
defer func() { // defer func() {
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupInfo", groupInfo) // trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupInfo", groupInfo)
}() // }()
groupInfoStr, err := db.DB.Rc.Fetch(groupInfoCache+groupID, time.Second*30*60, getGroupInfo) // groupInfoStr, err := db.DB.Rc.Fetch(groupInfoCache+groupID, time.Second*30*60, getGroupInfo)
if err != nil { // if err != nil {
return nil, utils.Wrap(err, "") // return nil, utils.Wrap(err, "")
} // }
err = json.Unmarshal([]byte(groupInfoStr), groupInfo) // err = json.Unmarshal([]byte(groupInfoStr), groupInfo)
return groupInfo, utils.Wrap(err, "") // return groupInfo, utils.Wrap(err, "")
} //}
//
func DelGroupInfoFromCache(ctx context.Context, groupID string) (err error) { //func DelGroupInfoFromCache(ctx context.Context, groupID string) (err error) {
defer func() { // defer func() {
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID) // trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
}() // }()
return db.DB.Rc.TagAsDeleted(groupInfoCache + groupID) // return db.DB.Rc.TagAsDeleted(groupInfoCache + groupID)
} //}
func GetAllFriendsInfoFromCache(ctx context.Context, userID string) (friends []*mysql.Friend, err error) { func GetAllFriendsInfoFromCache(ctx context.Context, userID string) (friends []*mysql.Friend, err error) {
getAllFriendInfo := func() (string, error) { getAllFriendInfo := func() (string, error) {

View File

@ -8,36 +8,30 @@ import (
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"
"encoding/json" "encoding/json"
//"github.com/dtm-labs/rockscache" "github.com/dtm-labs/rockscache"
"gorm.io/gorm" "gorm.io/gorm"
//"time" //"time"
) )
type GroupModel struct { type GroupModel struct {
db *mysql.Group db *mysql.Group
cache *cache.GroupCache cache *cache.GroupCache
mongo *mongo.Client mongo *mongo.Client
} }
func NewGroupModel() *GroupModel {
func NewGroupModel() {
var groupModel GroupModel var groupModel GroupModel
redisClient := cache.InitRedis()
rdb := cache.NewRedisClient(redisClient)
groupModel.db = mysql.NewGroupDB() groupModel.db = mysql.NewGroupDB()
//mgo := mongo.In() groupModel.cache = cache.NewGroupCache(cache.InitRedis(), groupModel.db, rockscache.Options{
DisableCacheRead: false,
StrongConsistency: true,
})
groupModel.mongo = mongo.NewMongoClient()
return &groupModel
} }
func (g *GroupModel) Find(ctx context.Context, groupIDs []string) (groups []*mysql.Group, err error) { func (g *GroupModel) Find(ctx context.Context, groupIDs []string) (groups []*mysql.Group, err error) {
g.cache.Client. return g.cache.GetGroupsInfoFromCache(ctx, groupIDs)
for _, groupID := range groupIDs {
group, err := g.getGroupInfoFromCache(ctx, groupID)
if err != nil {
return nil, err
}
groups = append(groups, group)
}
return groups, nil
} }
func (g *GroupModel) Create(ctx context.Context, groups []*mysql.Group) error { func (g *GroupModel) Create(ctx context.Context, groups []*mysql.Group) error {
@ -45,47 +39,9 @@ func (g *GroupModel) Create(ctx context.Context, groups []*mysql.Group) error {
} }
func (g *GroupModel) Delete(ctx context.Context, groupIDs []string) error { func (g *GroupModel) Delete(ctx context.Context, groupIDs []string) error {
err := g.db.DB.Transaction(func(tx *gorm.DB) error { return g.cache.DelGroupsInfoFromCache(ctx, groupIDs)
if err := g.db.Delete(ctx, groupIDs, tx); err != nil {
return err
}
if err := g.deleteGroupsInCache(ctx, groupIDs); err != nil {
return err
}
return nil
})
return err
} }
func (g *GroupModel) deleteGroupsInCache(ctx context.Context, groupIDs []string) error { func (g *GroupModel) Take(ctx context.Context, groupID string) (group *mysql.Group, err error) {
for _, groupID := range groupIDs { return g.cache.GetGroupInfoFromCache(ctx, groupID)
if err := g.weakRc.Cache.TagAsDeleted(g.getGroupCacheKey(groupID)); err != nil {
return err
}
}
return nil
}
func (g *GroupModel) getGroupInfoFromCache(ctx context.Context, groupID string) (groupInfo *mysql.Group, err error) {
getGroupInfo := func() (string, error) {
groupInfo, err := mysql.GetGroupInfoByGroupID(groupID)
if err != nil {
return "", utils.Wrap(err, "")
}
bytes, err := json.Marshal(groupInfo)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
groupInfo = &mysql.Group{}
defer func() {
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupInfo", groupInfo)
}()
groupInfoStr, err := g.weakRc.Cache.Fetch(groupInfoCache+groupID, GroupExpireTime, getGroupInfo)
if err != nil {
return nil, utils.Wrap(err, "")
}
err = json.Unmarshal([]byte(groupInfoStr), groupInfo)
return groupInfo, utils.Wrap(err, "")
} }

View File

@ -9,9 +9,10 @@ type UserModel struct {
db *mysql.User db *mysql.User
} }
func NewGroupUser(ctx context.Context) { func NewGroupUser(ctx context.Context) *UserModel {
var userModel UserModel var userModel UserModel
userModel.db = mysql.NewUserDB() userModel.db = mysql.NewUserDB()
return &userModel
} }
func (u *UserModel) Find(ctx context.Context, userIDs []string) (users []*mysql.User, err error) { func (u *UserModel) Find(ctx context.Context, userIDs []string) (users []*mysql.User, err error) {

View File

@ -13,7 +13,17 @@ import (
"time" "time"
) )
func InitMongoClient() *mongo.Client { type Client struct {
mongo *mongo.Client
}
func NewMongoClient() *Client {
var client Client
client.mongo = initMongo()
return &client
}
func initMongo() *mongo.Client {
uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority" uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
if config.Config.Mongo.DBUri != "" { if config.Config.Mongo.DBUri != "" {
// example: mongodb://$user:$password@mongo1.mongo:27017,mongo2.mongo:27017,mongo3.mongo:27017/$DBDatabase/?replicaSet=rs0&readPreference=secondary&authSource=admin&maxPoolSize=$DBMaxPoolSize // example: mongodb://$user:$password@mongo1.mongo:27017,mongo2.mongo:27017,mongo3.mongo:27017/$DBDatabase/?replicaSet=rs0&readPreference=secondary&authSource=admin&maxPoolSize=$DBMaxPoolSize

View File

@ -29,7 +29,8 @@ type Group struct {
func NewGroupDB() *Group { func NewGroupDB() *Group {
var group Group var group Group
group.DB = initMysqlDB(&group) db := ConnectToDB()
db = InitModel(db, &group)
return &group return &group
} }
@ -41,11 +42,11 @@ func (*Group) Create(ctx context.Context, groups []*Group) (err error) {
return err return err
} }
func (*Group) Delete(ctx context.Context, groupIDs []string) (err error) { func (g *Group) Delete(ctx context.Context, groupIDs []string, tx ...*gorm.DB) (err error) {
defer func() { defer func() {
trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupIDs", groupIDs) trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupIDs", groupIDs)
}() }()
return utils.Wrap(GroupDB.Where("group_id in (?)", groupIDs).Delete(&Group{}).Error, "") return utils.Wrap(getDBConn(g.DB, tx...).Where("group_id in (?)", groupIDs).Delete(&Group{}).Error, "")
} }
func (*Group) UpdateByMap(ctx context.Context, groupID string, args map[string]interface{}) (err error) { func (*Group) UpdateByMap(ctx context.Context, groupID string, args map[string]interface{}) (err error) {