From 59fe9f0ca96f61b1d4025e1eb8021b083aee6eb7 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 24 Mar 2023 11:29:39 +0800 Subject: [PATCH] add redis pipeline --- pkg/common/db/cache/redis.go | 72 +++++++++++++++++------------ pkg/rpcclient/notification/group.go | 5 +- 2 files changed, 45 insertions(+), 32 deletions(-) diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index 6a5d01227..b15d4d603 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -13,8 +13,9 @@ import ( pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" - "github.com/golang/protobuf/jsonpb" - "github.com/golang/protobuf/proto" + "github.com/gogo/protobuf/jsonpb" + + "google.golang.org/protobuf/proto" "github.com/go-redis/redis/v8" ) @@ -212,28 +213,30 @@ func (c *cache) DeleteTokenByUidPid(ctx context.Context, userID string, platform return utils.Wrap1(c.rdb.HDel(ctx, key, fields...).Err()) } -func (c *cache) GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) { - var errResult error - for _, v := range seqList { +func (c *cache) GetMessagesBySeq(ctx context.Context, userID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { + pipe := c.rdb.Pipeline() + for _, v := range seqs { //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 key := messageCache + userID + "_" + strconv.Itoa(int(v)) - result, err := c.rdb.Get(ctx, key).Result() - if err != nil { - errResult = err - failedSeqList = append(failedSeqList, v) - } else { - msg := sdkws.MsgData{} - err = jsonpb.UnmarshalString(result, &msg) - if err != nil { - errResult = err - failedSeqList = append(failedSeqList, v) - } else { - seqMsg = append(seqMsg, &msg) - } - + if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil { + return nil, nil, err } } - return seqMsg, failedSeqList, errResult + result, err := pipe.Exec(ctx) + for i, v := range result { + if v.Err() != nil { + failedSeqs = append(failedSeqs, seqs[i]) + } else { + msg := sdkws.MsgData{} + err = jsonpb.UnmarshalString(v.String(), &msg) + if err != nil { + failedSeqs = append(failedSeqs, seqs[i]) + } else { + seqMsgs = append(seqMsgs, &msg) + } + } + } + return seqMsgs, failedSeqs, err } func (c *cache) SetMessageToCache(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ) (int, error) { @@ -258,12 +261,14 @@ func (c *cache) SetMessageToCache(ctx context.Context, userID string, msgList [] } func (c *cache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ) error { + pipe := c.rdb.Pipeline() for _, v := range msgList { - if err := c.rdb.Del(ctx, messageCache+userID+"_"+strconv.Itoa(int(v.MsgData.Seq))).Err(); err != nil { + if err := pipe.Del(ctx, messageCache+userID+"_"+strconv.Itoa(int(v.MsgData.Seq))).Err(); err != nil { return utils.Wrap1(err) } } - return nil + _, err := pipe.Exec(ctx) + return utils.Wrap1(err) } func (c *cache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error { @@ -275,12 +280,14 @@ func (c *cache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error { if err != nil { return utils.Wrap1(err) } + pipe := c.rdb.Pipeline() for _, v := range vals { - if err := c.rdb.Del(ctx, v).Err(); err != nil { + if err := pipe.Del(ctx, v).Err(); err != nil { return utils.Wrap1(err) } } - return nil + _, err = pipe.Exec(ctx) + return utils.Wrap1(err) } func (c *cache) HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) { @@ -306,26 +313,31 @@ func (c *cache) HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, push return false, nil } if isInviteSignal { + pipe := c.rdb.Pipeline() for _, userID := range inviteeUserIDs { timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout) if err != nil { return false, utils.Wrap1(err) } - keyList := signalListCache + userID - err = c.rdb.LPush(ctx, keyList, msg.ClientMsgID).Err() + keys := signalListCache + userID + err = pipe.LPush(ctx, keys, msg.ClientMsgID).Err() if err != nil { return false, utils.Wrap1(err) } - err = c.rdb.Expire(ctx, keyList, time.Duration(timeout)*time.Second).Err() + err = pipe.Expire(ctx, keys, time.Duration(timeout)*time.Second).Err() if err != nil { return false, utils.Wrap1(err) } key := signalCache + msg.ClientMsgID - err = c.rdb.Set(ctx, key, msg.Content, time.Duration(timeout)*time.Second).Err() + err = pipe.Set(ctx, key, msg.Content, time.Duration(timeout)*time.Second).Err() if err != nil { return false, utils.Wrap1(err) } } + _, err := pipe.Exec(ctx) + if err != nil { + return false, utils.Wrap1(err) + } } return true, nil } @@ -367,8 +379,8 @@ func (c *cache) DelUserSignalList(ctx context.Context, userID string) error { return utils.Wrap1(c.rdb.Del(ctx, signalListCache+userID).Err()) } -func (c *cache) DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error { - for _, seq := range seqList { +func (c *cache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error { + for _, seq := range seqs { key := messageCache + userID + "_" + strconv.Itoa(int(seq)) result, err := c.rdb.Get(ctx, key).Result() if err != nil { diff --git a/pkg/rpcclient/notification/group.go b/pkg/rpcclient/notification/group.go index 0d2ee583f..385972fdd 100644 --- a/pkg/rpcclient/notification/group.go +++ b/pkg/rpcclient/notification/group.go @@ -2,6 +2,7 @@ package notification import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" @@ -196,7 +197,7 @@ func (c *Check) groupNotification(ctx context.Context, contentType int32, m prot } // 创建群后调用 -func (c *Check) GroupCreatedNotification(ctx context.Context, groupID string, initMemberList []string) { +func (c *Check) GroupCreatedNotification(ctx context.Context, groupID string, initMembers []string) { GroupCreatedTips := sdkws.GroupCreatedTips{Group: &sdkws.GroupInfo{}, OpUser: &sdkws.GroupMemberFullInfo{}, GroupOwnerUser: &sdkws.GroupMemberFullInfo{}} if err := c.setOpUserInfo(ctx, groupID, GroupCreatedTips.OpUser); err != nil { @@ -210,7 +211,7 @@ func (c *Check) GroupCreatedNotification(ctx context.Context, groupID string, in if err := c.setGroupOwnerInfo(ctx, groupID, GroupCreatedTips.GroupOwnerUser); err != nil { return } - for _, v := range initMemberList { + for _, v := range initMembers { var groupMemberInfo sdkws.GroupMemberFullInfo if err := c.setGroupMemberInfo(ctx, groupID, v, &groupMemberInfo); err != nil { continue