This commit is contained in:
wangchuxiao 2023-02-02 19:40:54 +08:00
parent 174c3e5fae
commit 47b725ac69
7 changed files with 273 additions and 47 deletions

View File

@ -8,6 +8,7 @@ import (
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
"time"
)
@ -22,11 +23,11 @@ type BlackCache struct {
rcClient *rockscache.Client
}
func NewBlackCache(blackDB *relation.Black) *BlackCache {
func NewBlackCache(rdb redis.UniversalClient, blackDB *relation.BlackGorm, options rockscache.Options) *BlackCache {
return &BlackCache{
blackDB: nil,
expireTime: 0,
rcClient: nil,
blackDB: blackDB,
expireTime: blackExpireTime,
rcClient: rockscache.NewClient(rdb, options),
}
}
@ -49,7 +50,7 @@ func (b *BlackCache) GetBlackIDs(ctx context.Context, userID string) (blackIDs [
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "blackIDList", blackIDs)
}()
blackIDListStr, err := b.rcClient.Fetch(blackListCache+userID, time.Second*30*60, getBlackIDList)
blackIDListStr, err := b.rcClient.Fetch(blackListCache+userID, b.expireTime, getBlackIDList)
if err != nil {
return nil, utils.Wrap(err, "")
}

View File

@ -1 +1,180 @@
package cache
import (
"Open_IM/pkg/common/db/relation"
"Open_IM/pkg/common/db/table"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
"golang.org/x/tools/go/ssa/testdata/src/strconv"
"time"
)
const (
conversationKey = "CONVERSATION:"
conversationIDsKey = "CONVERSATION_IDS:"
recvMsgOptKey = "RECV_MSG_OPT:"
superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
conversationExpireTime = time.Second * 60 * 60 * 12
)
type ConversationCache struct {
conversationDB *relation.ConversationGorm
expireTime time.Duration
rcClient *rockscache.Client
}
func NewConversationCache(rdb redis.UniversalClient, conversationDB *relation.ConversationGorm, options rockscache.Options) *ConversationCache {
return &ConversationCache{conversationDB: conversationDB, expireTime: conversationExpireTime, rcClient: rockscache.NewClient(rdb, options)}
}
func (c *ConversationCache) getConversationKey(ownerUserID, conversationID string) string {
return conversationKey + ownerUserID + ":" + conversationID
}
func (c *ConversationCache) getConversationIDsKey(ownerUserID string) string {
return conversationIDsKey + ownerUserID
}
func (c *ConversationCache) getRecvMsgOptKey(ownerUserID, conversationID string) string {
return recvMsgOptKey + ownerUserID + ":" + conversationID
}
func (c *ConversationCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
return superGroupRecvMsgNotNotifyUserIDsKey + groupID
}
func (c *ConversationCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) {
getConversationIDs := func() (string, error) {
conversationIDs, err := relation.GetConversationIDsByUserID(ownerUserID)
if err != nil {
return "", err
}
bytes, err := json.Marshal(conversationIDs)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs)
}()
conversationIDsStr, err := c.rcClient.Fetch(c.getConversationIDsKey(ownerUserID), time.Second*30*60, getConversationIDs)
err = json.Unmarshal([]byte(conversationIDsStr), &conversationIDs)
if err != nil {
return nil, utils.Wrap(err, "")
}
return conversationIDs, nil
}
func (c *ConversationCache) DelUserConversationIDs(ctx context.Context, ownerUserID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID)
}()
return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationIDsKey(ownerUserID)), "DelUserConversationIDs err")
}
func (c *ConversationCache) GetConversation(ctx context.Context, ownerUserID, conversationID string) (conversation *table.ConversationModel, err error) {
getConversation := func() (string, error) {
conversation, err := relation.GetConversation(ownerUserID, conversationID)
if err != nil {
return "", err
}
bytes, err := json.Marshal(conversation)
if err != nil {
return "", utils.Wrap(err, "conversation Marshal failed")
}
return string(bytes), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID, "conversation", *conversation)
}()
conversationStr, err := c.rcClient.Fetch(c.getConversationKey(ownerUserID, conversationID), c.expireTime, getConversation)
if err != nil {
return nil, err
}
conversation = &table.ConversationModel{}
err = json.Unmarshal([]byte(conversationStr), &conversation)
return conversation, utils.Wrap(err, "Unmarshal failed")
}
func (c *ConversationCache) DelConversation(ctx context.Context, ownerUserID, conversationID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID)
}()
return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationKey(ownerUserID, conversationID)), "DelConversation err")
}
func (c *ConversationCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []table.ConversationModel, err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs, "conversations", conversations)
}()
for _, conversationID := range conversationIDs {
conversation, err := c.GetConversation(ctx, ownerUserID, conversationID)
if err != nil {
return nil, err
}
conversations = append(conversations, *conversation)
}
return conversations, nil
}
func (c *ConversationCache) GetUserAllConversations(ctx context.Context, ownerUserID string) (conversations []table.ConversationModel, err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversations", conversations)
}()
IDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
if err != nil {
return nil, err
}
var conversationIDs []table.ConversationModel
for _, conversationID := range IDs {
conversation, err := c.GetConversation(ctx, ownerUserID, conversationID)
if err != nil {
return nil, err
}
conversationIDs = append(conversationIDs, *conversation)
}
return conversationIDs, nil
}
func (c *ConversationCache) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) {
getConversation := func() (string, error) {
conversation, err := relation.GetConversation(ownerUserID, conversationID)
if err != nil {
return "", err
}
return strconv.Itoa(int(conversation.RecvMsgOpt)), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID, "opt", opt)
}()
optStr, err := c.rcClient.Fetch(c.getConversationKey(ownerUserID, conversationID), c.expireTime, getConversation)
if err != nil {
return 0, err
}
return strconv.Atoi(optStr)
}
func (c *ConversationCache) DelUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) error {
return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationKey(ownerUserID, conversationID)), "DelUserRecvMsgOpt failed")
}
func (c *ConversationCache) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) {
return nil, nil
}
func (c *ConversationCache) DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (err error) {
return nil
}
func (c *ConversationCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint32, err error) {
return
}
func (c *ConversationCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) {
return
}

View File

@ -5,6 +5,7 @@ import (
"Open_IM/pkg/common/db/table"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"Open_IM/pkg/utilsv2"
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
@ -115,7 +116,14 @@ func (f *FriendCache) GetFriend(ctx context.Context, ownerUserID, friendUserID s
if err != nil {
return nil, err
}
friendInfo := &relation.FriendUser{}
err = json.Unmarshal([]byte(groupMemberInfoStr), groupMember)
return groupMember, utils.Wrap(err, "")
friend = &table.FriendModel{}
err = json.Unmarshal([]byte(friendStr), friend)
return friend, utils.Wrap(err, "")
}
func (f *FriendCache) DelFriend(ctx context.Context, ownerUserID, friendUserID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "friendUserID", friendUserID)
}()
return f.rcClient.TagAsDeleted(f.getFriendKey(ownerUserID, friendUserID))
}

View File

@ -324,9 +324,8 @@ func (r *RedisClient) HandleSignalInfo(operationID string, msg *pbCommon.MsgData
return false, nil
}
case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept:
return false, errors.New("signalInfo do not need offlinePush")
return false, nil
default:
log2.NewDebug(operationID, utils.GetSelfFuncName(), "req invalid type", string(msg.Content))
return false, nil
}
if isInviteSignal {

View File

@ -2,32 +2,35 @@ package cache
import (
"Open_IM/pkg/common/db/relation"
"Open_IM/pkg/common/db/table"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
"strconv"
"time"
)
const (
UserExpireTime = time.Second * 60 * 60 * 12
userInfoKey = "USER_INFO:"
userExpireTime = time.Second * 60 * 60 * 12
userInfoKey = "USER_INFO:"
userGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:"
)
type UserCache struct {
userDB *relation.User
userDB *relation.UserGorm
expireTime time.Duration
redisClient *RedisClient
rcClient *rockscache.Client
}
func NewUserCache(rdb redis.UniversalClient, userDB *relation.User, options rockscache.Options) *UserCache {
func NewUserCache(rdb redis.UniversalClient, userDB *relation.UserGorm, options rockscache.Options) *UserCache {
return &UserCache{
userDB: userDB,
expireTime: UserExpireTime,
expireTime: userExpireTime,
redisClient: NewRedisClient(rdb),
rcClient: rockscache.NewClient(rdb, options),
}
@ -37,7 +40,11 @@ func (u *UserCache) getUserInfoKey(userID string) string {
return userInfoKey + userID
}
func (u *UserCache) GetUserInfo(ctx context.Context, userID string) (userInfo *relation.User, err error) {
func (u *UserCache) getUserGlobalRecvMsgOptKey(userID string) string {
return userGlobalRecvMsgOptKey + userID
}
func (u *UserCache) GetUserInfo(ctx context.Context, userID string) (userInfo *table.UserModel, err error) {
getUserInfo := func() (string, error) {
userInfo, err := u.userDB.Take(ctx, userID)
if err != nil {
@ -52,17 +59,17 @@ func (u *UserCache) GetUserInfo(ctx context.Context, userID string) (userInfo *r
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "userInfo", *userInfo)
}()
userInfoStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), time.Second*30*60, getUserInfo)
userInfoStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), u.expireTime, getUserInfo)
if err != nil {
return nil, err
}
userInfo = &relation.User{}
userInfo = &table.UserModel{}
err = json.Unmarshal([]byte(userInfoStr), userInfo)
return userInfo, utils.Wrap(err, "")
}
func (u *UserCache) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relation.User, error) {
var users []*relation.User
func (u *UserCache) GetUsersInfo(ctx context.Context, userIDs []string) ([]*table.UserModel, error) {
var users []*table.UserModel
for _, userID := range userIDs {
user, err := GetUserInfoFromCache(ctx, userID)
if err != nil {
@ -77,7 +84,7 @@ func (u *UserCache) DelUserInfo(ctx context.Context, userID string) (err error)
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
}()
return u.rcClient.TagAsDeleted(u.getUserInfoKey(userID) + userID)
return u.rcClient.TagAsDeleted(u.getUserInfoKey(userID))
}
func (u *UserCache) DelUsersInfo(ctx context.Context, userIDs []string) (err error) {
@ -88,3 +95,28 @@ func (u *UserCache) DelUsersInfo(ctx context.Context, userIDs []string) (err err
}
return nil
}
func (u *UserCache) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) {
getUserGlobalRecvMsgOpt := func() (string, error) {
userInfo, err := u.userDB.Take(ctx, userID)
if err != nil {
return "", err
}
return strconv.Itoa(int(userInfo.GlobalRecvMsgOpt)), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "opt", opt)
}()
optStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), u.expireTime, getUserGlobalRecvMsgOpt)
if err != nil {
return 0, err
}
return strconv.Atoi(optStr)
}
func (u *UserCache) DelUserGlobalRecvMsgOpt(ctx context.Context, userID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
}()
return u.rcClient.TagAsDeleted(u.getUserGlobalRecvMsgOptKey(userID))
}

View File

@ -0,0 +1,27 @@
package localcache
import (
"Open_IM/pkg/proto/conversation"
"context"
"google.golang.org/grpc"
"sync"
)
type ConversationLocalCache struct {
lock sync.Mutex
SuperGroupRecvMsgNotNotifyUserIDs map[string][]string
rpc *grpc.ClientConn
conversation conversation.ConversationClient
}
func NewConversationLocalCache(rpc *grpc.ClientConn) ConversationLocalCache {
return ConversationLocalCache{
SuperGroupRecvMsgNotNotifyUserIDs: make(map[string][]string, 0),
rpc: rpc,
conversation: conversation.NewConversationClient(rpc),
}
}
func (g *ConversationLocalCache) GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) []string {
return []string{}
}

View File

@ -1,35 +1,15 @@
package relation
import (
"Open_IM/pkg/common/db/table"
"gorm.io/gorm"
)
var ConversationDB *gorm.DB
type Conversation struct {
OwnerUserID string `gorm:"column:owner_user_id;primary_key;type:char(128)" json:"OwnerUserID"`
ConversationID string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"`
ConversationType int32 `gorm:"column:conversation_type" json:"conversationType"`
UserID string `gorm:"column:user_id;type:char(64)" json:"userID"`
GroupID string `gorm:"column:group_id;type:char(128)" json:"groupID"`
RecvMsgOpt int32 `gorm:"column:recv_msg_opt" json:"recvMsgOpt"`
UnreadCount int32 `gorm:"column:unread_count" json:"unreadCount"`
DraftTextTime int64 `gorm:"column:draft_text_time" json:"draftTextTime"`
IsPinned bool `gorm:"column:is_pinned" json:"isPinned"`
IsPrivateChat bool `gorm:"column:is_private_chat" json:"isPrivateChat"`
BurnDuration int32 `gorm:"column:burn_duration;default:30" json:"burnDuration"`
GroupAtType int32 `gorm:"column:group_at_type" json:"groupAtType"`
IsNotInGroup bool `gorm:"column:is_not_in_group" json:"isNotInGroup"`
UpdateUnreadCountTime int64 `gorm:"column:update_unread_count_time" json:"updateUnreadCountTime"`
AttachedInfo string `gorm:"column:attached_info;type:varchar(1024)" json:"attachedInfo"`
Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"`
type ConversationGorm struct {
DB *gorm.DB
}
func (Conversation) TableName() string {
return "conversations"
}
func SetConversation(conversation Conversation) (bool, error) {
func SetConversation(conversation table.ConversationModel) (bool, error) {
var isUpdate bool
newConversation := conversation
if ConversationDB.Model(&Conversation{}).Find(&newConversation).RowsAffected == 0 {
@ -93,7 +73,7 @@ func GetExistConversationUserIDList(ownerUserIDList []string, conversationID str
return resultArr, nil
}
func GetConversation(OwnerUserID, conversationID string) (Conversation, error) {
func GetConversation(OwnerUserID, conversationID string) (table.ConversationModel, error) {
var conversation Conversation
err := ConversationDB.Table("conversations").Where("owner_user_id=? and conversation_id=?", OwnerUserID, conversationID).Take(&conversation).Error
return conversation, err
@ -116,7 +96,7 @@ func UpdateColumnsConversations(ownerUserIDList []string, conversationID string,
}
func GetConversationIDListByUserID(userID string) ([]string, error) {
func GetConversationIDsByUserID(userID string) ([]string, error) {
var IDList []string
err := ConversationDB.Model(&Conversation{}).Where("owner_user_id=?", userID).Pluck("conversation_id", &IDList).Error
return IDList, err