reaction message add expiration

This commit is contained in:
Gordon 2022-12-12 15:34:49 +08:00
parent b3f5c2f185
commit acee47e861
5 changed files with 45 additions and 40 deletions

View File

@ -83,7 +83,6 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
if !(!isSenderSync && msgChannelValue.aggregationID == v.MsgData.SendID) { if !(!isSenderSync && msgChannelValue.aggregationID == v.MsgData.SendID) {
notStoragePushMsgList = append(notStoragePushMsgList, v) notStoragePushMsgList = append(notStoragePushMsgList, v)
} }
} }
} }
@ -107,6 +106,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
singleMsgSuccessCount += uint64(len(storageMsgList)) singleMsgSuccessCount += uint64(len(storageMsgList))
singleMsgSuccessCountMutex.Unlock() singleMsgSuccessCountMutex.Unlock()
och.SendMessageToMongoCH(msgChannelValue.aggregationID, triggerID, storageMsgList, lastSeq) och.SendMessageToMongoCH(msgChannelValue.aggregationID, triggerID, storageMsgList, lastSeq)
for _, v := range storageMsgList { for _, v := range storageMsgList {
sendMessageToPushMQ(v, msgChannelValue.aggregationID) sendMessageToPushMQ(v, msgChannelValue.aggregationID)
} }

View File

@ -12,52 +12,43 @@ import (
"time" "time"
) )
func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.ModifyMessageReactionExtensionsReq) (resp *msg.ModifyMessageReactionExtensionsResp, err error) { func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.SetMessageReactionExtensionsReq) (resp *msg.SetMessageReactionExtensionsResp, err error) {
var rResp msg.ModifyMessageReactionExtensionsResp log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String())
var extendMsgResp msg.ExtendMsgResp var rResp msg.SetMessageReactionExtensionsResp
var failedExtendMsgResp msg.ExtendMsgResp rResp.ClientMsgID = req.ClientMsgID
var oneExtendMsg msg.ExtendMsg
var failedExtendMsg msg.ExtendMsg
oneExtendMsg.ClientMsgID = req.ClientMsgID
oneExtendMsg.MsgFirstModifyTime = req.MsgFirstModifyTime
oneFailedReactionExtensionList := make(map[string]*msg.KeyValueResp)
oneSuccessReactionExtensionList := make(map[string]*msg.KeyValueResp)
isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType) isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType)
if err != nil { if err != nil {
extendMsgResp.ErrCode = 100 rResp.ErrCode = 100
extendMsgResp.ErrMsg = err.Error() rResp.ErrMsg = err.Error()
for k, value := range req.ReactionExtensionList { for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp) temp := new(msg.KeyValueResp)
temp.KeyValue = value temp.KeyValue = value
temp.ErrMsg = err.Error() temp.ErrMsg = err.Error()
temp.ErrCode = 100 temp.ErrCode = 100
oneFailedReactionExtensionList[k] = temp rResp.Result = append(rResp.Result, temp)
} }
oneExtendMsg.ReactionExtensionList = oneFailedReactionExtensionList
extendMsgResp.ExtendMsg = &oneExtendMsg
rResp.FailedList = append(rResp.FailedList, &extendMsgResp)
return &rResp, nil return &rResp, nil
} }
if !isExists { if !isExists {
if !req.IsReact { if !req.IsReact {
log.Debug(req.OperationID, "redis handle firstly", req.String()) log.Debug(req.OperationID, "redis handle firstly", req.String())
oneExtendMsg.MsgFirstModifyTime = utils.GetCurrentTimestampByMill() rResp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill()
//redis处理 //redis处理
for k, v := range req.ReactionExtensionList { for k, v := range req.ReactionExtensionList {
//抢占分布式锁 //抢占分布式锁
err := lockMessageTypeKey(req.ClientMsgID, k) err := lockMessageTypeKey(req.ClientMsgID, k)
if err != nil { if err != nil {
setKeyResultInfo(oneFailedReactionExtensionList, 100, err.Error(), req.ClientMsgID, k, v) setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
continue continue
} }
v.LatestUpdateTime = utils.GetCurrentTimestampByMill() v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)) newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v))
if newerr != nil { if newerr != nil {
setKeyResultInfo(oneFailedReactionExtensionList, 201, newerr.Error(), req.ClientMsgID, k, v) setKeyResultInfo(&rResp, 201, newerr.Error(), req.ClientMsgID, k, v)
continue continue
} }
setKeyResultInfo(oneSuccessReactionExtensionList, 0, "", req.ClientMsgID, k, v) setKeyResultInfo(&rResp, 0, "", req.ClientMsgID, k, v)
} }
_, err := db.DB.SetMessageReactionExpire(req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour) _, err := db.DB.SetMessageReactionExpire(req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour)
if err != nil { if err != nil {
@ -74,53 +65,47 @@ func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.M
//抢占分布式锁 //抢占分布式锁
err := lockMessageTypeKey(req.ClientMsgID, k) err := lockMessageTypeKey(req.ClientMsgID, k)
if err != nil { if err != nil {
setKeyResultInfo(oneFailedReactionExtensionList, 100, err.Error(), req.ClientMsgID, k, v) setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
continue continue
} }
redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k) redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k)
if err != nil && err != go_redis.Nil { if err != nil && err != go_redis.Nil {
setKeyResultInfo(oneFailedReactionExtensionList, 200, err.Error(), req.ClientMsgID, k, v) setKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, k, v)
continue continue
} }
temp := new(server_api_params.KeyValue) temp := new(server_api_params.KeyValue)
utils.JsonStringToStruct(redisValue, temp) utils.JsonStringToStruct(redisValue, temp)
if v.LatestUpdateTime != temp.LatestUpdateTime { if v.LatestUpdateTime != temp.LatestUpdateTime {
setKeyResultInfo(oneFailedReactionExtensionList, 300, "message have update", req.ClientMsgID, k, temp) setKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, k, temp)
continue continue
} else { } else {
v.LatestUpdateTime = utils.GetCurrentTimestampByMill() v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)) newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v))
if newerr != nil { if newerr != nil {
setKeyResultInfo(oneFailedReactionExtensionList, 201, newerr.Error(), req.ClientMsgID, k, temp) setKeyResultInfo(&rResp, 201, newerr.Error(), req.ClientMsgID, k, temp)
continue continue
} }
setKeyResultInfo(oneSuccessReactionExtensionList, 0, "", req.ClientMsgID, k, v) setKeyResultInfo(&rResp, 0, "", req.ClientMsgID, k, v)
} }
} }
} }
oneExtendMsg.ReactionExtensionList = oneSuccessReactionExtensionList
extendMsgResp.ExtendMsg = &oneExtendMsg
failedExtendMsg.ReactionExtensionList = oneFailedReactionExtensionList
failedExtendMsgResp.ExtendMsg = &failedExtendMsg
rResp.FailedList = append(rResp.FailedList, &failedExtendMsgResp)
rResp.SuccessList = append(rResp.FailedList, &extendMsgResp)
if !isExists && !req.IsReact { if !isExists && !req.IsReact {
ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, true) ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, true)
} else { } else {
ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false) ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false)
} }
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String())
return &rResp, nil return &rResp, nil
} }
func setKeyResultInfo(m map[string]*msg.KeyValueResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *server_api_params.KeyValue) { func setKeyResultInfo(r *msg.SetMessageReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *server_api_params.KeyValue) {
temp := new(msg.KeyValueResp) temp := new(msg.KeyValueResp)
temp.KeyValue = keyValue temp.KeyValue = keyValue
temp.ErrCode = errCode temp.ErrCode = errCode
temp.ErrMsg = errMsg temp.ErrMsg = errMsg
m[typeKey] = temp r.Result = append(r.Result, temp)
_ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey) _ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
} }

View File

@ -13,7 +13,7 @@ import (
) )
func ExtendMessageUpdatedNotification(operationID, sendID string, sourceID string, sessionType int32, func ExtendMessageUpdatedNotification(operationID, sendID string, sourceID string, sessionType int32,
req *msg.ModifyMessageReactionExtensionsReq, resp *msg.ModifyMessageReactionExtensionsResp, isHistory bool) { req *msg.SetMessageReactionExtensionsReq, resp *msg.SetMessageReactionExtensionsResp, isHistory bool) {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["rep"] = req m["rep"] = req
m["resp"] = resp m["resp"] = resp

View File

@ -69,8 +69,8 @@ type ModifyMessageReactionExtensionsReq struct {
type ModifyMessageReactionExtensionsResp struct { type ModifyMessageReactionExtensionsResp struct {
CommResp CommResp
Data struct { Data struct {
SuccessList []*msg.ExtendMsgResp `json:"successList"` SuccessList []*msg.ExtendMsgResp `json:"successList,omitempty"`
FailedList []*msg.ExtendMsgResp `json:"failedList"` FailedList []*msg.ExtendMsgResp `json:"failedList,omitempty"`
} `json:"data"` } `json:"data"`
} }

View File

@ -167,6 +167,26 @@ message ModifyMessageReactionExtensionsReq {
bool isExternalExtensions = 10; bool isExternalExtensions = 10;
int64 msgFirstModifyTime = 11; int64 msgFirstModifyTime = 11;
} }
message SetMessageReactionExtensionsReq {
string operationID = 1;
string sourceID = 2;
string opUserID = 3;
int32 sessionType = 4;
map <string, server_api_params.KeyValue>reactionExtensionList = 5;
string clientMsgID = 6;
google.protobuf.StringValue ex = 7;
google.protobuf.StringValue attachedInfo = 8;
bool isReact = 9;
bool isExternalExtensions = 10;
int64 msgFirstModifyTime = 11;
}
message SetMessageReactionExtensionsResp {
int32 errCode = 1;
string errMsg = 2;
string clientMsgID = 3;
int64 msgFirstModifyTime = 4;
repeated KeyValueResp result = 5;
}
message ModifyMessageReactionExtensionsResp { message ModifyMessageReactionExtensionsResp {
int32 errCode = 1; int32 errCode = 1;
@ -229,7 +249,7 @@ service msg {
rpc GetWriteDiffMsg(GetWriteDiffMsgReq) returns(GetWriteDiffMsgResp); rpc GetWriteDiffMsg(GetWriteDiffMsgReq) returns(GetWriteDiffMsgResp);
// modify msg // modify msg
rpc SetMessageReactionExtensions(ModifyMessageReactionExtensionsReq) returns(ModifyMessageReactionExtensionsResp); rpc SetMessageReactionExtensions(SetMessageReactionExtensionsReq) returns(SetMessageReactionExtensionsResp);
rpc GetMessageListReactionExtensions(OperateMessageListReactionExtensionsReq) returns(OperateMessageListReactionExtensionsResp); rpc GetMessageListReactionExtensions(OperateMessageListReactionExtensionsReq) returns(OperateMessageListReactionExtensionsResp);
rpc AddMessageReactionExtensions(ModifyMessageReactionExtensionsReq) returns(ModifyMessageReactionExtensionsResp); rpc AddMessageReactionExtensions(ModifyMessageReactionExtensionsReq) returns(ModifyMessageReactionExtensionsResp);
rpc DeleteMessageReactionExtensions(OperateMessageListReactionExtensionsReq) returns(OperateMessageListReactionExtensionsResp); rpc DeleteMessageReactionExtensions(OperateMessageListReactionExtensionsReq) returns(OperateMessageListReactionExtensionsResp);