diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 35a388e8e..d50f451ee 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -83,7 +83,6 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { if !(!isSenderSync && msgChannelValue.aggregationID == v.MsgData.SendID) { notStoragePushMsgList = append(notStoragePushMsgList, v) } - } } @@ -107,6 +106,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { singleMsgSuccessCount += uint64(len(storageMsgList)) singleMsgSuccessCountMutex.Unlock() och.SendMessageToMongoCH(msgChannelValue.aggregationID, triggerID, storageMsgList, lastSeq) + for _, v := range storageMsgList { sendMessageToPushMQ(v, msgChannelValue.aggregationID) } diff --git a/internal/rpc/msg/extend_msg.go b/internal/rpc/msg/extend_msg.go index baf48bc78..b0d83959f 100644 --- a/internal/rpc/msg/extend_msg.go +++ b/internal/rpc/msg/extend_msg.go @@ -12,52 +12,43 @@ import ( "time" ) -func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.ModifyMessageReactionExtensionsReq) (resp *msg.ModifyMessageReactionExtensionsResp, err error) { - var rResp msg.ModifyMessageReactionExtensionsResp - var extendMsgResp msg.ExtendMsgResp - var failedExtendMsgResp msg.ExtendMsgResp - var oneExtendMsg msg.ExtendMsg - var failedExtendMsg msg.ExtendMsg - oneExtendMsg.ClientMsgID = req.ClientMsgID - oneExtendMsg.MsgFirstModifyTime = req.MsgFirstModifyTime - oneFailedReactionExtensionList := make(map[string]*msg.KeyValueResp) - oneSuccessReactionExtensionList := make(map[string]*msg.KeyValueResp) +func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.SetMessageReactionExtensionsReq) (resp *msg.SetMessageReactionExtensionsResp, err error) { + log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String()) + var rResp msg.SetMessageReactionExtensionsResp + rResp.ClientMsgID = req.ClientMsgID isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType) if err != nil { - extendMsgResp.ErrCode = 100 - extendMsgResp.ErrMsg = err.Error() - for k, value := range req.ReactionExtensionList { + rResp.ErrCode = 100 + rResp.ErrMsg = err.Error() + for _, value := range req.ReactionExtensionList { temp := new(msg.KeyValueResp) temp.KeyValue = value temp.ErrMsg = err.Error() temp.ErrCode = 100 - oneFailedReactionExtensionList[k] = temp + rResp.Result = append(rResp.Result, temp) } - oneExtendMsg.ReactionExtensionList = oneFailedReactionExtensionList - extendMsgResp.ExtendMsg = &oneExtendMsg - rResp.FailedList = append(rResp.FailedList, &extendMsgResp) return &rResp, nil } if !isExists { if !req.IsReact { log.Debug(req.OperationID, "redis handle firstly", req.String()) - oneExtendMsg.MsgFirstModifyTime = utils.GetCurrentTimestampByMill() + rResp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill() //redis处理 for k, v := range req.ReactionExtensionList { //抢占分布式锁 err := lockMessageTypeKey(req.ClientMsgID, k) if err != nil { - setKeyResultInfo(oneFailedReactionExtensionList, 100, err.Error(), req.ClientMsgID, k, v) + setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v) continue } v.LatestUpdateTime = utils.GetCurrentTimestampByMill() newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)) if newerr != nil { - setKeyResultInfo(oneFailedReactionExtensionList, 201, newerr.Error(), req.ClientMsgID, k, v) + setKeyResultInfo(&rResp, 201, newerr.Error(), req.ClientMsgID, k, v) continue } - setKeyResultInfo(oneSuccessReactionExtensionList, 0, "", req.ClientMsgID, k, v) + setKeyResultInfo(&rResp, 0, "", req.ClientMsgID, k, v) } _, err := db.DB.SetMessageReactionExpire(req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour) if err != nil { @@ -74,53 +65,47 @@ func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.M //抢占分布式锁 err := lockMessageTypeKey(req.ClientMsgID, k) if err != nil { - setKeyResultInfo(oneFailedReactionExtensionList, 100, err.Error(), req.ClientMsgID, k, v) + setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v) continue } redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k) if err != nil && err != go_redis.Nil { - setKeyResultInfo(oneFailedReactionExtensionList, 200, err.Error(), req.ClientMsgID, k, v) + setKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, k, v) continue } temp := new(server_api_params.KeyValue) utils.JsonStringToStruct(redisValue, temp) if v.LatestUpdateTime != temp.LatestUpdateTime { - setKeyResultInfo(oneFailedReactionExtensionList, 300, "message have update", req.ClientMsgID, k, temp) + setKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, k, temp) continue } else { v.LatestUpdateTime = utils.GetCurrentTimestampByMill() newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)) if newerr != nil { - setKeyResultInfo(oneFailedReactionExtensionList, 201, newerr.Error(), req.ClientMsgID, k, temp) + setKeyResultInfo(&rResp, 201, newerr.Error(), req.ClientMsgID, k, temp) continue } - setKeyResultInfo(oneSuccessReactionExtensionList, 0, "", req.ClientMsgID, k, v) + setKeyResultInfo(&rResp, 0, "", req.ClientMsgID, k, v) } } } - - oneExtendMsg.ReactionExtensionList = oneSuccessReactionExtensionList - extendMsgResp.ExtendMsg = &oneExtendMsg - failedExtendMsg.ReactionExtensionList = oneFailedReactionExtensionList - failedExtendMsgResp.ExtendMsg = &failedExtendMsg - rResp.FailedList = append(rResp.FailedList, &failedExtendMsgResp) - rResp.SuccessList = append(rResp.FailedList, &extendMsgResp) if !isExists && !req.IsReact { ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, true) } else { ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false) } + log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String()) return &rResp, nil } -func setKeyResultInfo(m map[string]*msg.KeyValueResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *server_api_params.KeyValue) { +func setKeyResultInfo(r *msg.SetMessageReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *server_api_params.KeyValue) { temp := new(msg.KeyValueResp) temp.KeyValue = keyValue temp.ErrCode = errCode temp.ErrMsg = errMsg - m[typeKey] = temp + r.Result = append(r.Result, temp) _ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey) } diff --git a/internal/rpc/msg/extend_msg.notification.go b/internal/rpc/msg/extend_msg.notification.go index 957e50e40..cf91463cd 100644 --- a/internal/rpc/msg/extend_msg.notification.go +++ b/internal/rpc/msg/extend_msg.notification.go @@ -13,7 +13,7 @@ import ( ) func ExtendMessageUpdatedNotification(operationID, sendID string, sourceID string, sessionType int32, - req *msg.ModifyMessageReactionExtensionsReq, resp *msg.ModifyMessageReactionExtensionsResp, isHistory bool) { + req *msg.SetMessageReactionExtensionsReq, resp *msg.SetMessageReactionExtensionsResp, isHistory bool) { m := make(map[string]interface{}) m["rep"] = req m["resp"] = resp diff --git a/pkg/base_info/msg.go b/pkg/base_info/msg.go index 193ea40ab..4e6f0cbd6 100644 --- a/pkg/base_info/msg.go +++ b/pkg/base_info/msg.go @@ -69,8 +69,8 @@ type ModifyMessageReactionExtensionsReq struct { type ModifyMessageReactionExtensionsResp struct { CommResp Data struct { - SuccessList []*msg.ExtendMsgResp `json:"successList"` - FailedList []*msg.ExtendMsgResp `json:"failedList"` + SuccessList []*msg.ExtendMsgResp `json:"successList,omitempty"` + FailedList []*msg.ExtendMsgResp `json:"failedList,omitempty"` } `json:"data"` } diff --git a/pkg/proto/msg/msg.proto b/pkg/proto/msg/msg.proto index 48292ae06..3dd7391f8 100644 --- a/pkg/proto/msg/msg.proto +++ b/pkg/proto/msg/msg.proto @@ -167,6 +167,26 @@ message ModifyMessageReactionExtensionsReq { bool isExternalExtensions = 10; int64 msgFirstModifyTime = 11; } +message SetMessageReactionExtensionsReq { + string operationID = 1; + string sourceID = 2; + string opUserID = 3; + int32 sessionType = 4; + map reactionExtensionList = 5; + string clientMsgID = 6; + google.protobuf.StringValue ex = 7; + google.protobuf.StringValue attachedInfo = 8; + bool isReact = 9; + bool isExternalExtensions = 10; + int64 msgFirstModifyTime = 11; +} +message SetMessageReactionExtensionsResp { + int32 errCode = 1; + string errMsg = 2; + string clientMsgID = 3; + int64 msgFirstModifyTime = 4; + repeated KeyValueResp result = 5; +} message ModifyMessageReactionExtensionsResp { int32 errCode = 1; @@ -229,7 +249,7 @@ service msg { rpc GetWriteDiffMsg(GetWriteDiffMsgReq) returns(GetWriteDiffMsgResp); // modify msg - rpc SetMessageReactionExtensions(ModifyMessageReactionExtensionsReq) returns(ModifyMessageReactionExtensionsResp); + rpc SetMessageReactionExtensions(SetMessageReactionExtensionsReq) returns(SetMessageReactionExtensionsResp); rpc GetMessageListReactionExtensions(OperateMessageListReactionExtensionsReq) returns(OperateMessageListReactionExtensionsResp); rpc AddMessageReactionExtensions(ModifyMessageReactionExtensionsReq) returns(ModifyMessageReactionExtensionsResp); rpc DeleteMessageReactionExtensions(OperateMessageListReactionExtensionsReq) returns(OperateMessageListReactionExtensionsResp);