From 4c931fba5c55f6b42680ce87ed1b4ee3e0870cc9 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Tue, 31 Jan 2023 18:55:22 +0800 Subject: [PATCH] errcode --- internal/cron_task/cron_task.go | 4 + internal/rpc/admin_cms/admin_cms.go | 24 +- pkg/common/db/cache/group.go | 312 +++++++++++++++++++++-- pkg/common/db/cache/rockscache.go | 34 +-- pkg/common/db/cache/user.go | 90 +++++++ pkg/common/db/controller/chatlog.go | 49 ++++ pkg/common/db/controller/group.go | 13 +- pkg/common/db/relation/chat_log_model.go | 14 +- pkg/common/db/relation/file_model.go | 50 ---- pkg/common/db/unrelation/super_group.go | 4 +- pkg/utils/local_cache.go | 78 ------ 11 files changed, 491 insertions(+), 181 deletions(-) create mode 100644 pkg/common/db/cache/user.go create mode 100644 pkg/common/db/controller/chatlog.go delete mode 100644 pkg/common/db/relation/file_model.go delete mode 100644 pkg/utils/local_cache.go diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go index c261fc48b..7da3c9d90 100644 --- a/internal/cron_task/cron_task.go +++ b/internal/cron_task/cron_task.go @@ -3,6 +3,7 @@ package cronTask import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/db/mysql_model/im_mysql_model" rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" @@ -45,6 +46,9 @@ func StartCronTask(userID, workingGroupID string) { } type CronTask struct { + spec string + groupInterface controller.GroupInterface + userInterface controller.UserInterface } func getCronTaskOperationID() string { diff --git a/internal/rpc/admin_cms/admin_cms.go b/internal/rpc/admin_cms/admin_cms.go index b1448693d..28b641a34 100644 --- a/internal/rpc/admin_cms/admin_cms.go +++ b/internal/rpc/admin_cms/admin_cms.go @@ -3,6 +3,7 @@ package admin_cms import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db/cache" "Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/db/relation" "Open_IM/pkg/common/log" @@ -36,16 +37,26 @@ type adminCMSServer struct { adminCMSInterface controller.AdminCMSInterface groupInterface controller.GroupInterface userInterface controller.UserInterface + chatLogInterface controller.ChatLogInterface } func NewAdminCMSServer(port int) *adminCMSServer { log.NewPrivateLog(constant.LogFileName) - return &adminCMSServer{ + admin := &adminCMSServer{ rpcPort: port, rpcRegisterName: config.Config.RpcRegisterName.OpenImAdminCMSName, etcdSchema: config.Config.Etcd.EtcdSchema, etcdAddr: config.Config.Etcd.EtcdAddr, } + var mysql relation.Mysql + var redis cache.RedisClient + mysql.InitConn() + redis.InitRedis() + admin.userInterface = controller.NewUserController(mysql.GormConn()) + admin.groupInterface = controller.NewGroupController(mysql.GormConn(), redis.GetClient(), nil) + admin.adminCMSInterface = controller.NewAdminCMSController(mysql.GormConn()) + admin.chatLogInterface = controller.NewChatLogController(mysql.GormConn()) + return admin } func (s *adminCMSServer) Run() { @@ -125,18 +136,15 @@ func (s *adminCMSServer) AdminLogin(ctx context.Context, req *pbAdminCMS.AdminLo } resp.UserName = admin.Nickname resp.FaceURL = admin.FaceURL - log.NewInfo(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), "resp: ", resp.String()) return resp, nil } func (s *adminCMSServer) GetUserToken(ctx context.Context, req *pbAdminCMS.GetUserTokenReq) (*pbAdminCMS.GetUserTokenResp, error) { - resp := &pbAdminCMS.GetUserTokenResp{} token, expTime, err := token_verify.CreateToken(req.UserID, int(req.PlatformID)) if err != nil { - return resp, nil + return nil, err } - resp.Token = token - resp.ExpTime = expTime + resp := &pbAdminCMS.GetUserTokenResp{Token: token, ExpTime: expTime} return resp, nil } @@ -156,8 +164,7 @@ func (s *adminCMSServer) GetChatLogs(ctx context.Context, req *pbAdminCMS.GetCha } chatLog.SendTime = sendTime } - resp := &pbAdminCMS.GetChatLogsResp{} - num, chatLogs, err := relation.GetChatLog(&chatLog, req.Pagination.PageNumber, req.Pagination.ShowNumber, []int32{ + num, chatLogs, err := s.chatLogInterface.GetChatLog(&chatLog, req.Pagination.PageNumber, req.Pagination.ShowNumber, []int32{ constant.Text, constant.Picture, constant.Voice, @@ -177,6 +184,7 @@ func (s *adminCMSServer) GetChatLogs(ctx context.Context, req *pbAdminCMS.GetCha if err != nil { return nil, err } + resp := &pbAdminCMS.GetChatLogsResp{} resp.ChatLogsNum = int32(num) for _, chatLog := range chatLogs { pbChatLog := &pbAdminCMS.ChatLog{} diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index 960a2b41c..50563ca88 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -1,36 +1,92 @@ package cache import ( + "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/relation" + "Open_IM/pkg/common/db/unrelation" "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" "context" "encoding/json" "github.com/dtm-labs/rockscache" "github.com/go-redis/redis/v8" + "math/big" + "sort" + "strconv" + "sync" "time" ) -const GroupExpireTime = time.Second * 60 * 60 * 12 -const groupInfoCacheKey = "GROUP_INFO_CACHE:" +const ( + groupExpireTime = time.Second * 60 * 60 * 12 + groupInfoKey = "GROUP_INFO:" + groupMemberIDsKey = "GROUP_MEMBER_IDS:" + groupMembersHashKey = "GROUP_MEMBERS_HASH:" + groupMemberInfoKey = "GROUP_MEMBER_INFO:" + joinedSuperGroupsKey = "JOIN_SUPER_GROUPS:" + joinedGroupsKey = "JOIN_GROUPS_KEY:" + groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:" +) type GroupCache struct { group *relation.Group groupMember *relation.GroupMember groupRequest *relation.GroupRequest + mongoDB *unrelation.SuperGroupMgoDB expireTime time.Duration redisClient *RedisClient rcClient *rockscache.Client + + //local cache + cacheGroupMtx sync.RWMutex + cacheGroupMemberUserIDs map[string]*GroupMemberIDsHash } -func NewGroupCache(rdb redis.UniversalClient, groupDB *relation.Group, groupMemberDB *relation.GroupMember, groupRequestDB *relation.GroupRequest, opts rockscache.Options) *GroupCache { - return &GroupCache{rcClient: rockscache.NewClient(rdb, opts), expireTime: GroupExpireTime, group: groupDB, groupMember: groupMemberDB, groupRequest: groupRequestDB, redisClient: NewRedisClient(rdb)} +type GroupMemberIDsHash struct { + MemberListHash uint64 + UserIDs []string +} + +func NewGroupCache(rdb redis.UniversalClient, groupDB *relation.Group, groupMemberDB *relation.GroupMember, groupRequestDB *relation.GroupRequest, mongoClient *unrelation.SuperGroupMgoDB, opts rockscache.Options) *GroupCache { + return &GroupCache{rcClient: rockscache.NewClient(rdb, opts), expireTime: groupExpireTime, + group: groupDB, groupMember: groupMemberDB, groupRequest: groupRequestDB, redisClient: NewRedisClient(rdb), + mongoDB: mongoClient, cacheGroupMemberUserIDs: make(map[string]*GroupMemberIDsHash, 0), + } } func (g *GroupCache) getRedisClient() *RedisClient { return g.redisClient } +func (g *GroupCache) getGroupInfoKey(groupID string) string { + return groupInfoKey + groupID +} + +func (g *GroupCache) getJoinedSuperGroupsIDKey(userID string) string { + return joinedSuperGroupsKey + userID +} + +func (g *GroupCache) getJoinedGroupsKey(userID string) string { + return joinedGroupsKey + userID +} + +func (g *GroupCache) getGroupMembersHashKey(groupID string) string { + return groupMembersHashKey + groupID +} + +func (g *GroupCache) getGroupMemberIDsKey(groupID string) string { + return groupMemberIDsKey + groupID +} + +func (g *GroupCache) getGroupMemberInfoKey(groupID, userID string) string { + return groupMemberInfoKey + groupID + "-" + userID +} + +func (g *GroupCache) getGroupMemberNumKey(groupID string) string { + return groupMemberNumKey + groupID +} + +/// groupInfo func (g *GroupCache) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relation.Group, err error) { for _, groupID := range groupIDs { group, err := g.GetGroupInfo(ctx, groupID) @@ -58,9 +114,9 @@ func (g *GroupCache) GetGroupInfo(ctx context.Context, groupID string) (group *r defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "group", *group) }() - groupStr, err := g.rcClient.Fetch(g.getGroupInfoCacheKey(groupID), g.expireTime, getGroup) + groupStr, err := g.rcClient.Fetch(g.getGroupInfoKey(groupID), g.expireTime, getGroup) if err != nil { - return nil, utils.Wrap(err, "") + return nil, err } err = json.Unmarshal([]byte(groupStr), group) return group, utils.Wrap(err, "") @@ -70,7 +126,7 @@ func (g *GroupCache) DelGroupInfo(ctx context.Context, groupID string) (err erro defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID) }() - return g.rcClient.TagAsDeleted(g.getGroupInfoCacheKey(groupID)) + return g.rcClient.TagAsDeleted(g.getGroupInfoKey(groupID)) } func (g *GroupCache) DelGroupsInfo(ctx context.Context, groupIDs []string) error { @@ -82,21 +138,247 @@ func (g *GroupCache) DelGroupsInfo(ctx context.Context, groupIDs []string) error return nil } -func (g *GroupCache) getGroupInfoCacheKey(groupID string) string { - return groupInfoCacheKey + groupID -} - -func (g *GroupCache) DelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error) { +// userJoinSuperGroup +func (g *GroupCache) BatchDelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error) { for _, userID := range userIDs { - if err := g.rcClient.TagAsDeleted(joinedSuperGroupListCache + userID); err != nil { + if err := g.DelJoinedSuperGroupIDs(ctx, userID); err != nil { return err } } + return nil } -func (g *GroupCache) DelJoinedSuperGroupID(ctx context.Context, userID string) (err error) { +func (g *GroupCache) DelJoinedSuperGroupIDs(ctx context.Context, userID string) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID) }() - return g.rcClient.TagAsDeleted(joinedSuperGroupListCache + userID) + return g.rcClient.TagAsDeleted(g.getJoinedSuperGroupsIDKey(userID)) +} + +func (g *GroupCache) GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error) { + getJoinedSuperGroupIDList := func() (string, error) { + userToSuperGroup, err := g.mongoDB.GetSuperGroupByUserID(ctx, userID) + if err != nil { + return "", err + } + bytes, err := json.Marshal(userToSuperGroup.GroupIDList) + if err != nil { + return "", utils.Wrap(err, "") + } + return string(bytes), nil + } + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "joinedSuperGroupIDs", joinedSuperGroupIDs) + }() + joinedSuperGroupListStr, err := g.rcClient.Fetch(g.getJoinedSuperGroupsIDKey(userID), time.Second*30*60, getJoinedSuperGroupIDList) + if err != nil { + return nil, err + } + err = json.Unmarshal([]byte(joinedSuperGroupListStr), &joinedSuperGroupIDs) + return joinedSuperGroupIDs, utils.Wrap(err, "") +} + +// groupMembersHash +func (g *GroupCache) GetGroupMembersHash(ctx context.Context, groupID string) (hashCodeUint64 uint64, err error) { + generateHash := func() (string, error) { + groupInfo, err := g.GetGroupInfo(ctx, groupID) + if err != nil { + return "", err + } + if groupInfo.Status == constant.GroupStatusDismissed { + return "0", nil + } + groupMemberIDList, err := g.GetGroupMemberIDs(ctx, groupID) + if err != nil { + return "", err + } + sort.Strings(groupMemberIDList) + var all string + for _, v := range groupMemberIDList { + all += v + } + bi := big.NewInt(0) + bi.SetString(utils.Md5(all)[0:8], 16) + return strconv.Itoa(int(bi.Uint64())), nil + } + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "hashCodeUint64", hashCodeUint64) + }() + hashCodeStr, err := g.rcClient.Fetch(g.getGroupMembersHashKey(groupID), time.Second*30*60, generateHash) + if err != nil { + return 0, utils.Wrap(err, "fetch failed") + } + hashCode, err := strconv.Atoi(hashCodeStr) + return uint64(hashCode), err +} + +func (g *GroupCache) DelGroupMembersHash(ctx context.Context, groupID string) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID) + }() + return g.rcClient.TagAsDeleted(g.getGroupMembersHashKey(groupID)) +} + +// groupMemberIDs +// from redis +func (g *GroupCache) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) { + f := func() (string, error) { + groupInfo, err := g.GetGroupInfo(ctx, groupID) + if err != nil { + return "", err + } + var groupMemberIDList []string + if groupInfo.GroupType == constant.SuperGroup { + superGroup, err := g.mongoDB.GetSuperGroup(ctx, groupID) + if err != nil { + return "", err + } + groupMemberIDList = superGroup.MemberIDList + } else { + groupMemberIDList, err = relation.GetGroupMemberIDListByGroupID(groupID) + if err != nil { + return "", err + } + } + bytes, err := json.Marshal(groupMemberIDList) + if err != nil { + return "", utils.Wrap(err, "") + } + return string(bytes), nil + } + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupMemberIDList", groupMemberIDs) + }() + groupIDListStr, err := g.rcClient.Fetch(g.getGroupMemberIDsKey(groupID), time.Second*30*60, f) + if err != nil { + return nil, err + } + err = json.Unmarshal([]byte(groupIDListStr), &groupMemberIDs) + return groupMemberIDs, nil +} + +func (g *GroupCache) DelGroupMemberIDs(ctx context.Context, groupID string) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID) + }() + return g.rcClient.TagAsDeleted(g.getGroupMemberIDsKey(groupID)) +} + +// from local map +func (g *GroupCache) LocalGetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) { + remoteHash, err := g.GetGroupMembersHash(ctx, groupID) + if err != nil { + g.cacheGroupMtx.Lock() + defer g.cacheGroupMtx.Unlock() + delete(g.cacheGroupMemberUserIDs, groupID) + return nil, err + } + g.cacheGroupMtx.Lock() + defer g.cacheGroupMtx.Unlock() + if remoteHash == 0 { + delete(g.cacheGroupMemberUserIDs, groupID) + return []string{}, nil + } + localCache, ok := g.cacheGroupMemberUserIDs[groupID] + if ok && localCache.MemberListHash == remoteHash { + return localCache.UserIDs, nil + } + groupMemberIDsRemote, err := g.GetGroupMemberIDs(ctx, groupID) + if err != nil { + return nil, err + } + g.cacheGroupMemberUserIDs[groupID] = &GroupMemberIDsHash{ + MemberListHash: remoteHash, + UserIDs: groupMemberIDsRemote, + } + return groupMemberIDsRemote, nil +} + +// JoinedGroups +func (g *GroupCache) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) { + getJoinedGroupIDList := func() (string, error) { + joinedGroupList, err := relation.GetJoinedGroupIDListByUserID(userID) + if err != nil { + return "", err + } + bytes, err := json.Marshal(joinedGroupList) + if err != nil { + return "", utils.Wrap(err, "") + } + return string(bytes), nil + } + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "joinedGroupIDs", joinedGroupIDs) + }() + joinedGroupIDListStr, err := g.rcClient.Fetch(g.getJoinedGroupsKey(userID), time.Second*30*60, getJoinedGroupIDList) + if err != nil { + return nil, err + } + err = json.Unmarshal([]byte(joinedGroupIDListStr), &joinedGroupIDs) + return joinedGroupIDs, utils.Wrap(err, "") +} + +func (g *GroupCache) DelJoinedGroupIDs(ctx context.Context, userID string) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID) + }() + return g.rcClient.TagAsDeleted(g.getJoinedGroupsKey(userID)) +} + +// GetGroupMemberInfo +func (g *GroupCache) GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relation.GroupMember, err error) { + getGroupMemberInfo := func() (string, error) { + groupMemberInfo, err := relation.GetGroupMemberInfoByGroupIDAndUserID(groupID, userID) + if err != nil { + return "", err + } + bytes, err := json.Marshal(groupMemberInfo) + if err != nil { + return "", utils.Wrap(err, "") + } + return string(bytes), nil + } + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userID", userID, "groupMember", *groupMember) + }() + groupMemberInfoStr, err := g.rcClient.Fetch(g.getGroupMemberInfoKey(groupID, userID), time.Second*30*60, getGroupMemberInfo) + if err != nil { + return nil, err + } + groupMember = &relation.GroupMember{} + err = json.Unmarshal([]byte(groupMemberInfoStr), groupMember) + return groupMember, utils.Wrap(err, "") +} + +func (g *GroupCache) DelGroupMemberInfo(ctx context.Context, groupID, userID string) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userID", userID) + }() + return g.rcClient.TagAsDeleted(g.getGroupMemberInfoKey(groupID, userID)) +} + +// groupMemberNum +func (g *GroupCache) GetGroupMemberNum(ctx context.Context, groupID string) (num int, err error) { + getGroupMemberNum := func() (string, error) { + num, err := relation.GetGroupMemberNumByGroupID(groupID) + if err != nil { + return "", err + } + return strconv.Itoa(int(num)), nil + } + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "num", num) + }() + groupMember, err := g.rcClient.Fetch(g.getGroupMemberNumKey(groupID), time.Second*30*60, getGroupMemberNum) + if err != nil { + return 0, err + } + return strconv.Atoi(groupMember) +} + +func (g *GroupCache) DelGroupMemberNum(ctx context.Context, groupID string) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID) + }() + return g.rcClient.TagAsDeleted(g.getGroupMemberNumKey(groupID)) } diff --git a/pkg/common/db/cache/rockscache.go b/pkg/common/db/cache/rockscache.go index ecd975db4..e17ea7c81 100644 --- a/pkg/common/db/cache/rockscache.go +++ b/pkg/common/db/cache/rockscache.go @@ -4,6 +4,7 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/mongo" "Open_IM/pkg/common/db/mysql" + "Open_IM/pkg/common/db/relation" "Open_IM/pkg/common/log" "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" @@ -19,31 +20,32 @@ import ( ) const ( - userInfoCache = "USER_INFO_CACHE:" + //userInfoCache = "USER_INFO_CACHE:" friendRelationCache = "FRIEND_RELATION_CACHE:" blackListCache = "BLACK_LIST_CACHE:" - groupCache = "GROUP_CACHE:" + //groupCache = "GROUP_CACHE:" //groupInfoCache = "GROUP_INFO_CACHE:" - groupOwnerIDCache = "GROUP_OWNER_ID:" - joinedGroupListCache = "JOINED_GROUP_LIST_CACHE:" - groupMemberInfoCache = "GROUP_MEMBER_INFO_CACHE:" - groupAllMemberInfoCache = "GROUP_ALL_MEMBER_INFO_CACHE:" - allFriendInfoCache = "ALL_FRIEND_INFO_CACHE:" - joinedSuperGroupListCache = "JOINED_SUPER_GROUP_LIST_CACHE:" - groupMemberListHashCache = "GROUP_MEMBER_LIST_HASH_CACHE:" - groupMemberNumCache = "GROUP_MEMBER_NUM_CACHE:" - conversationCache = "CONVERSATION_CACHE:" - conversationIDListCache = "CONVERSATION_ID_LIST_CACHE:" - extendMsgSetCache = "EXTEND_MSG_SET_CACHE:" - extendMsgCache = "EXTEND_MSG_CACHE:" + //groupOwnerIDCache = "GROUP_OWNER_ID:" + //joinedGroupListCache = "JOINED_GROUP_LIST_CACHE:" + //groupMemberInfoCache = "GROUP_MEMBER_INFO_CACHE:" + //groupAllMemberInfoCache = "GROUP_ALL_MEMBER_INFO_CACHE:" + allFriendInfoCache = "ALL_FRIEND_INFO_CACHE:" + //joinedSuperGroupListCache = "JOINED_SUPER_GROUP_LIST_CACHE:" + //groupMemberListHashCache = "GROUP_MEMBER_LIST_HASH_CACHE:" + //groupMemberNumCache = "GROUP_MEMBER_NUM_CACHE:" + conversationCache = "CONVERSATION_CACHE:" + conversationIDListCache = "CONVERSATION_ID_LIST_CACHE:" + + extendMsgSetCache = "EXTEND_MSG_SET_CACHE:" + extendMsgCache = "EXTEND_MSG_CACHE:" ) const scanCount = 3000 const RandomExpireAdjustment = 0.2 func (rc *RcClient) DelKeys() { - for _, key := range []string{groupCache, friendRelationCache, blackListCache, userInfoCache, groupInfoCacheKey, groupOwnerIDCache, joinedGroupListCache, - groupMemberInfoCache, groupAllMemberInfoCache, allFriendInfoCache} { + for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE", groupOwnerIDCache, joinedGroupListCache, + groupMemberInfoCache, groupAllMemberInfoCache, "ALL_FRIEND_INFO_CACHE:"} { fName := utils.GetSelfFuncName() var cursor uint64 var n int diff --git a/pkg/common/db/cache/user.go b/pkg/common/db/cache/user.go new file mode 100644 index 000000000..7c8526121 --- /dev/null +++ b/pkg/common/db/cache/user.go @@ -0,0 +1,90 @@ +package cache + +import ( + "Open_IM/pkg/common/db/relation" + "Open_IM/pkg/common/tracelog" + "Open_IM/pkg/utils" + "context" + "encoding/json" + "github.com/dtm-labs/rockscache" + "github.com/go-redis/redis/v8" + "time" +) + +const ( + UserExpireTime = time.Second * 60 * 60 * 12 + userInfoKey = "USER_INFO:" +) + +type UserCache struct { + userDB *relation.User + + expireTime time.Duration + redisClient *RedisClient + rcClient *rockscache.Client +} + +func NewUserCache(rdb redis.UniversalClient, userDB *relation.User, options rockscache.Options) *UserCache { + return &UserCache{ + userDB: userDB, + expireTime: UserExpireTime, + redisClient: NewRedisClient(rdb), + rcClient: rockscache.NewClient(rdb, options), + } +} + +func (u *UserCache) getUserInfoKey(userID string) string { + return userInfoKey + userID +} + +func (u *UserCache) GetUserInfo(ctx context.Context, userID string) (userInfo *relation.User, err error) { + getUserInfo := func() (string, error) { + userInfo, err := u.userDB.Take(ctx, userID) + if err != nil { + return "", err + } + bytes, err := json.Marshal(userInfo) + if err != nil { + return "", utils.Wrap(err, "") + } + return string(bytes), nil + } + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "userInfo", *userInfo) + }() + userInfoStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), time.Second*30*60, getUserInfo) + if err != nil { + return nil, err + } + userInfo = &relation.User{} + err = json.Unmarshal([]byte(userInfoStr), userInfo) + return userInfo, utils.Wrap(err, "") +} + +func (u *UserCache) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relation.User, error) { + var users []*relation.User + for _, userID := range userIDs { + user, err := GetUserInfoFromCache(ctx, userID) + if err != nil { + return nil, err + } + users = append(users, user) + } + return users, nil +} + +func (u *UserCache) DelUserInfo(ctx context.Context, userID string) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID) + }() + return u.rcClient.TagAsDeleted(u.getUserInfoKey(userID) + userID) +} + +func (u *UserCache) DelUsersInfo(ctx context.Context, userIDs []string) (err error) { + for _, userID := range userIDs { + if err := u.DelUserInfo(ctx, userID); err != nil { + return err + } + } + return nil +} diff --git a/pkg/common/db/controller/chatlog.go b/pkg/common/db/controller/chatlog.go new file mode 100644 index 000000000..038561982 --- /dev/null +++ b/pkg/common/db/controller/chatlog.go @@ -0,0 +1,49 @@ +package controller + +import ( + "Open_IM/pkg/common/db/relation" + pbMsg "Open_IM/pkg/proto/msg" + "gorm.io/gorm" +) + +type ChatLogInterface interface { + CreateChatLog(msg pbMsg.MsgDataToMQ) error + GetChatLog(chatLog *relation.ChatLog, pageNumber, showNumber int32, contentTypeList []int32) (int64, []relation.ChatLog, error) +} + +func NewChatLogController(db *gorm.DB) ChatLogInterface { + return &ChatLogController{database: NewChatLogDataBase(db)} +} + +type ChatLogController struct { + database ChatLogDataBaseInterface +} + +func (c *ChatLogController) CreateChatLog(msg pbMsg.MsgDataToMQ) error { + return c.database.CreateChatLog(msg) +} + +func (c *ChatLogController) GetChatLog(chatLog *relation.ChatLog, pageNumber, showNumber int32, contentTypeList []int32) (int64, []relation.ChatLog, error) { + return c.database.GetChatLog(chatLog, pageNumber, showNumber, contentTypeList) +} + +type ChatLogDataBaseInterface interface { + CreateChatLog(msg pbMsg.MsgDataToMQ) error + GetChatLog(chatLog *relation.ChatLog, pageNumber, showNumber int32, contentTypeList []int32) (int64, []relation.ChatLog, error) +} + +type ChatLogDataBase struct { + chatLogDB *relation.ChatLog +} + +func NewChatLogDataBase(db *gorm.DB) ChatLogDataBaseInterface { + return &ChatLogDataBase{chatLogDB: relation.NewChatLog(db)} +} + +func (c *ChatLogDataBase) CreateChatLog(msg pbMsg.MsgDataToMQ) error { + return c.chatLogDB.Create(msg) +} + +func (c *ChatLogDataBase) GetChatLog(chatLog *relation.ChatLog, pageNumber, showNumber int32, contentTypeList []int32) (int64, []relation.ChatLog, error) { + return c.chatLogDB.GetChatLog(chatLog, pageNumber, showNumber, contentTypeList) +} diff --git a/pkg/common/db/controller/group.go b/pkg/common/db/controller/group.go index 295698d93..be5d4caa1 100644 --- a/pkg/common/db/controller/group.go +++ b/pkg/common/db/controller/group.go @@ -85,8 +85,8 @@ func (g *GroupController) AddUserToSuperGroup(ctx context.Context, groupID strin panic("implement me") } -func NewGroupController(db *gorm.DB, rdb redis.UniversalClient, mgoDB *mongo.Client) GroupInterface { - groupController := &GroupController{database: newGroupDatabase(db, rdb, mgoDB)} +func NewGroupController(db *gorm.DB, rdb redis.UniversalClient, mgoClient *mongo.Client) GroupInterface { + groupController := &GroupController{database: newGroupDatabase(db, rdb, mgoClient)} return groupController } @@ -133,23 +133,24 @@ type GroupDataBase struct { mongoDB *unrelation.SuperGroupMgoDB } -func newGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, mgoDB *mongo.Client) GroupDataBaseInterface { +func newGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, mgoClient *mongo.Client) GroupDataBaseInterface { groupDB := relation.NewGroupDB(db) groupMemberDB := relation.NewGroupMemberDB(db) groupRequestDB := relation.NewGroupRequest(db) newDB := db + superGroupMgoDB := unrelation.NewSuperGroupMgoDB(mgoClient) database := &GroupDataBase{ groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB, db: newDB, - cache: cache.NewGroupCache(rdb, groupDB, groupMemberDB, groupRequestDB, rockscache.Options{ + cache: cache.NewGroupCache(rdb, groupDB, groupMemberDB, groupRequestDB, superGroupMgoDB, rockscache.Options{ RandomExpireAdjustment: 0.2, DisableCacheRead: false, DisableCacheDelete: false, StrongConsistency: true, }), - mongoDB: unrelation.NewSuperGroupMgoDB(mgoDB), + mongoDB: superGroupMgoDB, } return database } @@ -223,7 +224,7 @@ func (g *GroupDataBase) CreateSuperGroup(ctx context.Context, groupID string, in return err } - if err = g.cache.DelJoinedSuperGroupIDs(ctx, initMemberIDList); err != nil { + if err = g.cache.BatchDelJoinedSuperGroupIDs(ctx, initMemberIDList); err != nil { _ = sess.AbortTransaction(ctx) return err } diff --git a/pkg/common/db/relation/chat_log_model.go b/pkg/common/db/relation/chat_log_model.go index 9cb54ac00..03583cfb5 100644 --- a/pkg/common/db/relation/chat_log_model.go +++ b/pkg/common/db/relation/chat_log_model.go @@ -2,7 +2,6 @@ package relation import ( "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db" pbMsg "Open_IM/pkg/proto/msg" server_api_params "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" @@ -37,7 +36,11 @@ func (ChatLog) TableName() string { return "chat_logs" } -func InsertMessageToChatLog(msg pbMsg.MsgDataToMQ) error { +func NewChatLog(db *gorm.DB) *ChatLog { + return &ChatLog{DB: db} +} + +func (c *ChatLog) Create(msg pbMsg.MsgDataToMQ) error { chatLog := new(ChatLog) copier.Copy(chatLog, msg.MsgData) switch msg.MsgData.SessionType { @@ -61,12 +64,11 @@ func InsertMessageToChatLog(msg pbMsg.MsgDataToMQ) error { } chatLog.CreateTime = utils.UnixMillSecondToTime(msg.MsgData.CreateTime) chatLog.SendTime = utils.UnixMillSecondToTime(msg.MsgData.SendTime) - log.NewDebug("test", "this is ", chatLog) - return db.DB.MysqlDB.DefaultGormDB().Table("chat_logs").Create(chatLog).Error + return c.DB.Create(chatLog).Error } -func GetChatLog(chatLog *ChatLog, pageNumber, showNumber int32, contentTypeList []int32) (int64, []ChatLog, error) { - mdb := ChatLogDB.Table("chat_logs") +func (c *ChatLog) GetChatLog(chatLog *ChatLog, pageNumber, showNumber int32, contentTypeList []int32) (int64, []ChatLog, error) { + mdb := c.DB.Model(chatLog) if chatLog.SendTime.Unix() > 0 { mdb = mdb.Where("send_time > ? and send_time < ?", chatLog.SendTime, chatLog.SendTime.AddDate(0, 0, 1)) } diff --git a/pkg/common/db/relation/file_model.go b/pkg/common/db/relation/file_model.go deleted file mode 100644 index e1b93b72b..000000000 --- a/pkg/common/db/relation/file_model.go +++ /dev/null @@ -1,50 +0,0 @@ -package relation - -import ( - "gorm.io/gorm" - "time" -) - -var AppDB *gorm.DB - -type AppVersion struct { - Version string `gorm:"column:version;size:64" json:"version"` - Type int `gorm:"column:type;primary_key" json:"type"` - UpdateTime int `gorm:"column:update_time" json:"update_time"` - ForceUpdate bool `gorm:"column:force_update" json:"force_update"` - FileName string `gorm:"column:file_name" json:"file_name"` - YamlName string `gorm:"column:yaml_name" json:"yaml_name"` - UpdateLog string `gorm:"column:update_log" json:"update_log"` -} - -func (AppVersion) TableName() string { - return "app_version" -} - -func UpdateAppVersion(appType int, version string, forceUpdate bool, fileName, yamlName, updateLog string) error { - updateTime := int(time.Now().Unix()) - app := AppVersion{ - Version: version, - Type: appType, - UpdateTime: updateTime, - FileName: fileName, - YamlName: yamlName, - ForceUpdate: forceUpdate, - UpdateLog: updateLog, - } - result := AppDB.Model(AppVersion{}).Where("type = ?", appType).Updates(map[string]interface{}{"force_update": forceUpdate, - "version": version, "update_time": int(time.Now().Unix()), "file_name": fileName, "yaml_name": yamlName, "type": appType, "update_log": updateLog}) - if result.Error != nil { - return result.Error - } - if result.RowsAffected == 0 { - err := AppDB.Create(&app).Error - return err - } - return nil -} - -func GetNewestVersion(appType int) (*AppVersion, error) { - app := AppVersion{} - return &app, AppDB.Model(AppVersion{}).First(&app, appType).Error -} diff --git a/pkg/common/db/unrelation/super_group.go b/pkg/common/db/unrelation/super_group.go index c51bfcec9..1ad8659d8 100644 --- a/pkg/common/db/unrelation/super_group.go +++ b/pkg/common/db/unrelation/super_group.go @@ -108,8 +108,8 @@ func (db *SuperGroupMgoDB) RemoverUserFromSuperGroup(ctx context.Context, groupI func (db *SuperGroupMgoDB) GetSuperGroupByUserID(ctx context.Context, userID string) (*UserToSuperGroup, error) { var user UserToSuperGroup - _ = db.userToSuperGroupCollection.FindOne(ctx, bson.M{"user_id": userID}).Decode(&user) - return &user, nil + err := db.userToSuperGroupCollection.FindOne(ctx, bson.M{"user_id": userID}).Decode(&user) + return &user, utils.Wrap(err, "") } func (db *SuperGroupMgoDB) DeleteSuperGroup(ctx context.Context, groupID string) error { diff --git a/pkg/utils/local_cache.go b/pkg/utils/local_cache.go deleted file mode 100644 index 6fc35a79e..000000000 --- a/pkg/utils/local_cache.go +++ /dev/null @@ -1,78 +0,0 @@ -package utils - -import ( - "Open_IM/pkg/common/config" - rocksCache "Open_IM/pkg/common/db/rocks_cache" - "Open_IM/pkg/common/log" - "Open_IM/pkg/getcdv3" - pbCache "Open_IM/pkg/proto/cache" - "context" - "errors" - "sync" -) - -type GroupMemberUserIDListHash struct { - MemberListHash uint64 - UserIDList []string -} - -var CacheGroupMemberUserIDList = make(map[string]*GroupMemberUserIDListHash, 0) -var CacheGroupMtx sync.RWMutex - -func GetGroupMemberUserIDList(ctx context.Context, groupID string, operationID string) ([]string, error) { - groupHashRemote, err := GetGroupMemberUserIDListHashFromRemote(ctx, groupID) - if err != nil { - CacheGroupMtx.Lock() - defer CacheGroupMtx.Unlock() - delete(CacheGroupMemberUserIDList, groupID) - log.Error(operationID, "GetGroupMemberUserIDListHashFromRemote failed ", err.Error(), groupID) - return nil, Wrap(err, groupID) - } - - CacheGroupMtx.Lock() - defer CacheGroupMtx.Unlock() - - if groupHashRemote == 0 { - log.Info(operationID, "groupHashRemote == 0 ", groupID) - delete(CacheGroupMemberUserIDList, groupID) - return []string{}, nil - } - - groupInLocalCache, ok := CacheGroupMemberUserIDList[groupID] - if ok && groupInLocalCache.MemberListHash == groupHashRemote { - log.Debug(operationID, "in local cache ", groupID) - return groupInLocalCache.UserIDList, nil - } - log.Debug(operationID, "not in local cache or hash changed", groupID, " remote hash ", groupHashRemote, " in cache ", ok) - memberUserIDListRemote, err := GetGroupMemberUserIDListFromRemote(groupID, operationID) - if err != nil { - log.Error(operationID, "GetGroupMemberUserIDListFromRemote failed ", err.Error(), groupID) - return nil, Wrap(err, groupID) - } - CacheGroupMemberUserIDList[groupID] = &GroupMemberUserIDListHash{MemberListHash: groupHashRemote, UserIDList: memberUserIDListRemote} - return memberUserIDListRemote, nil -} - -func GetGroupMemberUserIDListHashFromRemote(ctx context.Context, groupID string) (uint64, error) { - return rocksCache.GetGroupMemberListHashFromCache(ctx, groupID) -} - -func GetGroupMemberUserIDListFromRemote(groupID string, operationID string) ([]string, error) { - getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: operationID, GroupID: groupID} - etcdConn, err := getcdv3.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImCacheName) - if err != nil { - return nil, err - } - client := pbCache.NewCacheClient(etcdConn) - cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) - if err != nil { - log.NewError(operationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) - return nil, Wrap(err, "GetGroupMemberIDListFromCache rpc call failed") - } - if cacheResp.CommonResp.ErrCode != 0 { - errMsg := operationID + "GetGroupMemberIDListFromCache rpc logic call failed " + cacheResp.CommonResp.ErrMsg - log.NewError(operationID, errMsg) - return nil, errors.New("errMsg") - } - return cacheResp.UserIDList, nil -}