diff --git a/pkg/common/db/cache/black.go b/pkg/common/db/cache/black.go index 4110d28ee..ec8fbcc42 100644 --- a/pkg/common/db/cache/black.go +++ b/pkg/common/db/cache/black.go @@ -8,6 +8,7 @@ import ( "context" "encoding/json" "github.com/dtm-labs/rockscache" + "github.com/go-redis/redis/v8" "time" ) @@ -22,11 +23,11 @@ type BlackCache struct { rcClient *rockscache.Client } -func NewBlackCache(blackDB *relation.Black) *BlackCache { +func NewBlackCache(rdb redis.UniversalClient, blackDB *relation.BlackGorm, options rockscache.Options) *BlackCache { return &BlackCache{ - blackDB: nil, - expireTime: 0, - rcClient: nil, + blackDB: blackDB, + expireTime: blackExpireTime, + rcClient: rockscache.NewClient(rdb, options), } } @@ -49,7 +50,7 @@ func (b *BlackCache) GetBlackIDs(ctx context.Context, userID string) (blackIDs [ defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "blackIDList", blackIDs) }() - blackIDListStr, err := b.rcClient.Fetch(blackListCache+userID, time.Second*30*60, getBlackIDList) + blackIDListStr, err := b.rcClient.Fetch(blackListCache+userID, b.expireTime, getBlackIDList) if err != nil { return nil, utils.Wrap(err, "") } diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index 08bf029d3..d7ef7fb61 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -1 +1,180 @@ package cache + +import ( + "Open_IM/pkg/common/db/relation" + "Open_IM/pkg/common/db/table" + "Open_IM/pkg/common/tracelog" + "Open_IM/pkg/utils" + "context" + "encoding/json" + "github.com/dtm-labs/rockscache" + "github.com/go-redis/redis/v8" + "golang.org/x/tools/go/ssa/testdata/src/strconv" + "time" +) + +const ( + conversationKey = "CONVERSATION:" + conversationIDsKey = "CONVERSATION_IDS:" + recvMsgOptKey = "RECV_MSG_OPT:" + superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" + conversationExpireTime = time.Second * 60 * 60 * 12 +) + +type ConversationCache struct { + conversationDB *relation.ConversationGorm + expireTime time.Duration + rcClient *rockscache.Client +} + +func NewConversationCache(rdb redis.UniversalClient, conversationDB *relation.ConversationGorm, options rockscache.Options) *ConversationCache { + return &ConversationCache{conversationDB: conversationDB, expireTime: conversationExpireTime, rcClient: rockscache.NewClient(rdb, options)} +} + +func (c *ConversationCache) getConversationKey(ownerUserID, conversationID string) string { + return conversationKey + ownerUserID + ":" + conversationID +} + +func (c *ConversationCache) getConversationIDsKey(ownerUserID string) string { + return conversationIDsKey + ownerUserID +} + +func (c *ConversationCache) getRecvMsgOptKey(ownerUserID, conversationID string) string { + return recvMsgOptKey + ownerUserID + ":" + conversationID +} + +func (c *ConversationCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { + return superGroupRecvMsgNotNotifyUserIDsKey + groupID +} + +func (c *ConversationCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) { + getConversationIDs := func() (string, error) { + conversationIDs, err := relation.GetConversationIDsByUserID(ownerUserID) + if err != nil { + return "", err + } + bytes, err := json.Marshal(conversationIDs) + if err != nil { + return "", utils.Wrap(err, "") + } + return string(bytes), nil + } + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs) + }() + conversationIDsStr, err := c.rcClient.Fetch(c.getConversationIDsKey(ownerUserID), time.Second*30*60, getConversationIDs) + err = json.Unmarshal([]byte(conversationIDsStr), &conversationIDs) + if err != nil { + return nil, utils.Wrap(err, "") + } + return conversationIDs, nil +} + +func (c *ConversationCache) DelUserConversationIDs(ctx context.Context, ownerUserID string) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID) + }() + return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationIDsKey(ownerUserID)), "DelUserConversationIDs err") +} + +func (c *ConversationCache) GetConversation(ctx context.Context, ownerUserID, conversationID string) (conversation *table.ConversationModel, err error) { + getConversation := func() (string, error) { + conversation, err := relation.GetConversation(ownerUserID, conversationID) + if err != nil { + return "", err + } + bytes, err := json.Marshal(conversation) + if err != nil { + return "", utils.Wrap(err, "conversation Marshal failed") + } + return string(bytes), nil + } + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID, "conversation", *conversation) + }() + conversationStr, err := c.rcClient.Fetch(c.getConversationKey(ownerUserID, conversationID), c.expireTime, getConversation) + if err != nil { + return nil, err + } + conversation = &table.ConversationModel{} + err = json.Unmarshal([]byte(conversationStr), &conversation) + return conversation, utils.Wrap(err, "Unmarshal failed") +} + +func (c *ConversationCache) DelConversation(ctx context.Context, ownerUserID, conversationID string) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID) + }() + return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationKey(ownerUserID, conversationID)), "DelConversation err") +} + +func (c *ConversationCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []table.ConversationModel, err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs, "conversations", conversations) + }() + for _, conversationID := range conversationIDs { + conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) + if err != nil { + return nil, err + } + conversations = append(conversations, *conversation) + } + return conversations, nil +} + +func (c *ConversationCache) GetUserAllConversations(ctx context.Context, ownerUserID string) (conversations []table.ConversationModel, err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversations", conversations) + }() + IDs, err := c.GetUserConversationIDs(ctx, ownerUserID) + if err != nil { + return nil, err + } + var conversationIDs []table.ConversationModel + for _, conversationID := range IDs { + conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) + if err != nil { + return nil, err + } + conversationIDs = append(conversationIDs, *conversation) + } + return conversationIDs, nil +} + +func (c *ConversationCache) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) { + getConversation := func() (string, error) { + conversation, err := relation.GetConversation(ownerUserID, conversationID) + if err != nil { + return "", err + } + return strconv.Itoa(int(conversation.RecvMsgOpt)), nil + } + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID, "opt", opt) + }() + optStr, err := c.rcClient.Fetch(c.getConversationKey(ownerUserID, conversationID), c.expireTime, getConversation) + if err != nil { + return 0, err + } + return strconv.Atoi(optStr) +} + +func (c *ConversationCache) DelUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) error { + return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationKey(ownerUserID, conversationID)), "DelUserRecvMsgOpt failed") +} + +func (c *ConversationCache) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) { + return nil, nil +} + +func (c *ConversationCache) DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (err error) { + return nil +} + +func (c *ConversationCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint32, err error) { + return +} + +func (c *ConversationCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) { + return +} diff --git a/pkg/common/db/cache/friend.go b/pkg/common/db/cache/friend.go index b76f58dc6..8e03279c9 100644 --- a/pkg/common/db/cache/friend.go +++ b/pkg/common/db/cache/friend.go @@ -5,6 +5,7 @@ import ( "Open_IM/pkg/common/db/table" "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" + "Open_IM/pkg/utilsv2" "context" "encoding/json" "github.com/dtm-labs/rockscache" @@ -115,7 +116,14 @@ func (f *FriendCache) GetFriend(ctx context.Context, ownerUserID, friendUserID s if err != nil { return nil, err } - friendInfo := &relation.FriendUser{} - err = json.Unmarshal([]byte(groupMemberInfoStr), groupMember) - return groupMember, utils.Wrap(err, "") + friend = &table.FriendModel{} + err = json.Unmarshal([]byte(friendStr), friend) + return friend, utils.Wrap(err, "") +} + +func (f *FriendCache) DelFriend(ctx context.Context, ownerUserID, friendUserID string) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "friendUserID", friendUserID) + }() + return f.rcClient.TagAsDeleted(f.getFriendKey(ownerUserID, friendUserID)) } diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index 3f7488b15..6f090228a 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -324,9 +324,8 @@ func (r *RedisClient) HandleSignalInfo(operationID string, msg *pbCommon.MsgData return false, nil } case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept: - return false, errors.New("signalInfo do not need offlinePush") + return false, nil default: - log2.NewDebug(operationID, utils.GetSelfFuncName(), "req invalid type", string(msg.Content)) return false, nil } if isInviteSignal { diff --git a/pkg/common/db/cache/user.go b/pkg/common/db/cache/user.go index 7c8526121..d9ba1e2c8 100644 --- a/pkg/common/db/cache/user.go +++ b/pkg/common/db/cache/user.go @@ -2,32 +2,35 @@ package cache import ( "Open_IM/pkg/common/db/relation" + "Open_IM/pkg/common/db/table" "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" "context" "encoding/json" "github.com/dtm-labs/rockscache" "github.com/go-redis/redis/v8" + "strconv" "time" ) const ( - UserExpireTime = time.Second * 60 * 60 * 12 - userInfoKey = "USER_INFO:" + userExpireTime = time.Second * 60 * 60 * 12 + userInfoKey = "USER_INFO:" + userGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:" ) type UserCache struct { - userDB *relation.User + userDB *relation.UserGorm expireTime time.Duration redisClient *RedisClient rcClient *rockscache.Client } -func NewUserCache(rdb redis.UniversalClient, userDB *relation.User, options rockscache.Options) *UserCache { +func NewUserCache(rdb redis.UniversalClient, userDB *relation.UserGorm, options rockscache.Options) *UserCache { return &UserCache{ userDB: userDB, - expireTime: UserExpireTime, + expireTime: userExpireTime, redisClient: NewRedisClient(rdb), rcClient: rockscache.NewClient(rdb, options), } @@ -37,7 +40,11 @@ func (u *UserCache) getUserInfoKey(userID string) string { return userInfoKey + userID } -func (u *UserCache) GetUserInfo(ctx context.Context, userID string) (userInfo *relation.User, err error) { +func (u *UserCache) getUserGlobalRecvMsgOptKey(userID string) string { + return userGlobalRecvMsgOptKey + userID +} + +func (u *UserCache) GetUserInfo(ctx context.Context, userID string) (userInfo *table.UserModel, err error) { getUserInfo := func() (string, error) { userInfo, err := u.userDB.Take(ctx, userID) if err != nil { @@ -52,17 +59,17 @@ func (u *UserCache) GetUserInfo(ctx context.Context, userID string) (userInfo *r defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "userInfo", *userInfo) }() - userInfoStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), time.Second*30*60, getUserInfo) + userInfoStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), u.expireTime, getUserInfo) if err != nil { return nil, err } - userInfo = &relation.User{} + userInfo = &table.UserModel{} err = json.Unmarshal([]byte(userInfoStr), userInfo) return userInfo, utils.Wrap(err, "") } -func (u *UserCache) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relation.User, error) { - var users []*relation.User +func (u *UserCache) GetUsersInfo(ctx context.Context, userIDs []string) ([]*table.UserModel, error) { + var users []*table.UserModel for _, userID := range userIDs { user, err := GetUserInfoFromCache(ctx, userID) if err != nil { @@ -77,7 +84,7 @@ func (u *UserCache) DelUserInfo(ctx context.Context, userID string) (err error) defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID) }() - return u.rcClient.TagAsDeleted(u.getUserInfoKey(userID) + userID) + return u.rcClient.TagAsDeleted(u.getUserInfoKey(userID)) } func (u *UserCache) DelUsersInfo(ctx context.Context, userIDs []string) (err error) { @@ -88,3 +95,28 @@ func (u *UserCache) DelUsersInfo(ctx context.Context, userIDs []string) (err err } return nil } + +func (u *UserCache) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) { + getUserGlobalRecvMsgOpt := func() (string, error) { + userInfo, err := u.userDB.Take(ctx, userID) + if err != nil { + return "", err + } + return strconv.Itoa(int(userInfo.GlobalRecvMsgOpt)), nil + } + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "opt", opt) + }() + optStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), u.expireTime, getUserGlobalRecvMsgOpt) + if err != nil { + return 0, err + } + return strconv.Atoi(optStr) +} + +func (u *UserCache) DelUserGlobalRecvMsgOpt(ctx context.Context, userID string) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID) + }() + return u.rcClient.TagAsDeleted(u.getUserGlobalRecvMsgOptKey(userID)) +} diff --git a/pkg/common/db/localcache/conversation.go b/pkg/common/db/localcache/conversation.go new file mode 100644 index 000000000..8a190a8fc --- /dev/null +++ b/pkg/common/db/localcache/conversation.go @@ -0,0 +1,27 @@ +package localcache + +import ( + "Open_IM/pkg/proto/conversation" + "context" + "google.golang.org/grpc" + "sync" +) + +type ConversationLocalCache struct { + lock sync.Mutex + SuperGroupRecvMsgNotNotifyUserIDs map[string][]string + rpc *grpc.ClientConn + conversation conversation.ConversationClient +} + +func NewConversationLocalCache(rpc *grpc.ClientConn) ConversationLocalCache { + return ConversationLocalCache{ + SuperGroupRecvMsgNotNotifyUserIDs: make(map[string][]string, 0), + rpc: rpc, + conversation: conversation.NewConversationClient(rpc), + } +} + +func (g *ConversationLocalCache) GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) []string { + return []string{} +} diff --git a/pkg/common/db/relation/conversation_model.go b/pkg/common/db/relation/conversation_model.go index de2e92663..9b0e25481 100644 --- a/pkg/common/db/relation/conversation_model.go +++ b/pkg/common/db/relation/conversation_model.go @@ -1,35 +1,15 @@ package relation import ( + "Open_IM/pkg/common/db/table" "gorm.io/gorm" ) -var ConversationDB *gorm.DB - -type Conversation struct { - OwnerUserID string `gorm:"column:owner_user_id;primary_key;type:char(128)" json:"OwnerUserID"` - ConversationID string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"` - ConversationType int32 `gorm:"column:conversation_type" json:"conversationType"` - UserID string `gorm:"column:user_id;type:char(64)" json:"userID"` - GroupID string `gorm:"column:group_id;type:char(128)" json:"groupID"` - RecvMsgOpt int32 `gorm:"column:recv_msg_opt" json:"recvMsgOpt"` - UnreadCount int32 `gorm:"column:unread_count" json:"unreadCount"` - DraftTextTime int64 `gorm:"column:draft_text_time" json:"draftTextTime"` - IsPinned bool `gorm:"column:is_pinned" json:"isPinned"` - IsPrivateChat bool `gorm:"column:is_private_chat" json:"isPrivateChat"` - BurnDuration int32 `gorm:"column:burn_duration;default:30" json:"burnDuration"` - GroupAtType int32 `gorm:"column:group_at_type" json:"groupAtType"` - IsNotInGroup bool `gorm:"column:is_not_in_group" json:"isNotInGroup"` - UpdateUnreadCountTime int64 `gorm:"column:update_unread_count_time" json:"updateUnreadCountTime"` - AttachedInfo string `gorm:"column:attached_info;type:varchar(1024)" json:"attachedInfo"` - Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"` +type ConversationGorm struct { + DB *gorm.DB } -func (Conversation) TableName() string { - return "conversations" -} - -func SetConversation(conversation Conversation) (bool, error) { +func SetConversation(conversation table.ConversationModel) (bool, error) { var isUpdate bool newConversation := conversation if ConversationDB.Model(&Conversation{}).Find(&newConversation).RowsAffected == 0 { @@ -93,7 +73,7 @@ func GetExistConversationUserIDList(ownerUserIDList []string, conversationID str return resultArr, nil } -func GetConversation(OwnerUserID, conversationID string) (Conversation, error) { +func GetConversation(OwnerUserID, conversationID string) (table.ConversationModel, error) { var conversation Conversation err := ConversationDB.Table("conversations").Where("owner_user_id=? and conversation_id=?", OwnerUserID, conversationID).Take(&conversation).Error return conversation, err @@ -116,7 +96,7 @@ func UpdateColumnsConversations(ownerUserIDList []string, conversationID string, } -func GetConversationIDListByUserID(userID string) ([]string, error) { +func GetConversationIDsByUserID(userID string) ([]string, error) { var IDList []string err := ConversationDB.Model(&Conversation{}).Where("owner_user_id=?", userID).Pluck("conversation_id", &IDList).Error return IDList, err