From 915118140fa08af41547b8f24aeaf5a4d9183bd3 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Mon, 12 Dec 2022 20:41:40 +0800 Subject: [PATCH] reaction message add expiration --- internal/rpc/msg/extend_msg.go | 64 +++++++++++++++++++-- internal/rpc/msg/extend_msg.notification.go | 18 ++++++ pkg/common/db/RedisModel.go | 5 ++ 3 files changed, 83 insertions(+), 4 deletions(-) diff --git a/internal/rpc/msg/extend_msg.go b/internal/rpc/msg/extend_msg.go index af4d042bc..ea0f02088 100644 --- a/internal/rpc/msg/extend_msg.go +++ b/internal/rpc/msg/extend_msg.go @@ -34,9 +34,7 @@ func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.S if !req.IsReact { log.Debug(req.OperationID, "redis handle firstly", req.String()) rResp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill() - //redis处理 for k, v := range req.ReactionExtensionList { - //抢占分布式锁 err := lockMessageTypeKey(req.ClientMsgID, k) if err != nil { setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v) @@ -63,7 +61,6 @@ func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.S log.Debug(req.OperationID, "redis handle secondly", req.String()) for k, v := range req.ReactionExtensionList { - //抢占分布式锁 err := lockMessageTypeKey(req.ClientMsgID, k) if err != nil { setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v) @@ -108,8 +105,17 @@ func setKeyResultInfo(r *msg.SetMessageReactionExtensionsResp, errCode int32, er r.Result = append(r.Result, temp) _ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey) } +func setDeleteKeyResultInfo(r *msg.DeleteMessageListReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *server_api_params.KeyValue) { + temp := new(msg.KeyValueResp) + temp.KeyValue = keyValue + temp.ErrCode = errCode + temp.ErrMsg = errMsg + r.Result = append(r.Result, temp) + _ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey) +} func (rpc *rpcChat) GetMessageListReactionExtensions(ctx context.Context, req *msg.GetMessageListReactionExtensionsReq) (resp *msg.GetMessageListReactionExtensionsResp, err error) { + log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String()) var rResp msg.GetMessageListReactionExtensionsResp for _, messageValue := range req.MessageReactionKeyList { var oneMessage msg.SingleMessageExtensionResult @@ -143,6 +149,7 @@ func (rpc *rpcChat) GetMessageListReactionExtensions(ctx context.Context, req *m } rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage) } + log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String()) return &rResp, nil } @@ -152,7 +159,56 @@ func (rpc *rpcChat) AddMessageReactionExtensions(ctx context.Context, req *msg.M } func (rpc *rpcChat) DeleteMessageReactionExtensions(ctx context.Context, req *msg.DeleteMessageListReactionExtensionsReq) (resp *msg.DeleteMessageListReactionExtensionsResp, err error) { - return + log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String()) + var rResp msg.DeleteMessageListReactionExtensionsResp + isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType) + if err != nil { + rResp.ErrCode = 100 + rResp.ErrMsg = err.Error() + for _, value := range req.ReactionExtensionList { + temp := new(msg.KeyValueResp) + temp.KeyValue = value + temp.ErrMsg = err.Error() + temp.ErrCode = 100 + rResp.Result = append(rResp.Result, temp) + } + return &rResp, nil + } + + if isExists { + log.Debug(req.OperationID, "redis handle this delete", req.String()) + for _, v := range req.ReactionExtensionList { + err := lockMessageTypeKey(req.ClientMsgID, v.TypeKey) + if err != nil { + setDeleteKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, v.TypeKey, v) + continue + } + + redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, v.TypeKey) + if err != nil && err != go_redis.Nil { + setDeleteKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, v.TypeKey, v) + continue + } + temp := new(server_api_params.KeyValue) + utils.JsonStringToStruct(redisValue, temp) + if v.LatestUpdateTime != temp.LatestUpdateTime { + setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp) + continue + } else { + newErr := db.DB.DeleteOneMessageKey(req.ClientMsgID, req.SessionType, v.TypeKey) + if newErr != nil { + setDeleteKeyResultInfo(&rResp, 201, newErr.Error(), req.ClientMsgID, v.TypeKey, temp) + continue + } + setDeleteKeyResultInfo(&rResp, 0, "", req.ClientMsgID, v.TypeKey, v) + } + } + } else { + + } + ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false) + log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String()) + return &rResp, nil } func lockMessageTypeKey(clientMsgID, typeKey string) (err error) { for i := 0; i < 3; i++ { diff --git a/internal/rpc/msg/extend_msg.notification.go b/internal/rpc/msg/extend_msg.notification.go index 0c8b3b126..f4f8bd88b 100644 --- a/internal/rpc/msg/extend_msg.notification.go +++ b/internal/rpc/msg/extend_msg.notification.go @@ -32,6 +32,24 @@ func ExtendMessageUpdatedNotification(operationID, sendID string, sourceID strin m.MsgFirstModifyTime = resp.MsgFirstModifyTime messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageModifier, utils.StructToJsonString(m), isHistory) } +func ExtendMessageDeleteNotification(operationID, sendID string, sourceID string, sessionType int32, + req *msg.DeleteMessageListReactionExtensionsReq, resp *msg.DeleteMessageListReactionExtensionsResp, isHistory bool) { + var m base_info.ReactionMessageDeleteNotification + m.SourceID = req.SourceID + m.OpUserID = req.OpUserID + m.SessionType = req.SessionType + keyMap := make(map[string]*open_im_sdk.KeyValue) + for _, valueResp := range resp.Result { + if valueResp.ErrCode == 0 { + keyMap[valueResp.KeyValue.TypeKey] = valueResp.KeyValue + } + } + m.SuccessReactionExtensionList = keyMap + m.ClientMsgID = req.ClientMsgID + m.MsgFirstModifyTime = req.MsgFirstModifyTime + + messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageDeleter, utils.StructToJsonString(m), isHistory) +} func messageReactionSender(operationID, sendID string, sourceID string, sessionType, contentType int32, content string, isHistory bool) { options := make(map[string]bool, 5) utils.SetSwitchFromOptions(options, constant.IsOfflinePush, false) diff --git a/pkg/common/db/RedisModel.go b/pkg/common/db/RedisModel.go index b9692cf1d..ab9f54118 100644 --- a/pkg/common/db/RedisModel.go +++ b/pkg/common/db/RedisModel.go @@ -452,6 +452,11 @@ func (d *DataBases) GetOneMessageAllReactionList(clientMsgID string, sessionType key := getMessageReactionExPrefix(clientMsgID, sessionType) return d.RDB.HGetAll(context.Background(), key).Result() +} +func (d *DataBases) DeleteOneMessageKey(clientMsgID string, sessionType int32, subKey string) error { + key := getMessageReactionExPrefix(clientMsgID, sessionType) + return d.RDB.HDel(context.Background(), key, subKey).Err() + } func (d *DataBases) SetMessageReactionExpire(clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { key := getMessageReactionExPrefix(clientMsgID, sessionType)