reaction message update

This commit is contained in:
Gordon 2022-12-19 11:18:23 +08:00
parent 25aa6a4345
commit 20a2cb3d3c
4 changed files with 171 additions and 30 deletions

View File

@ -16,6 +16,7 @@ func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.S
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String()) log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String())
var rResp msg.SetMessageReactionExtensionsResp var rResp msg.SetMessageReactionExtensionsResp
rResp.ClientMsgID = req.ClientMsgID rResp.ClientMsgID = req.ClientMsgID
rResp.MsgFirstModifyTime = req.MsgFirstModifyTime
isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType) isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType)
if err != nil { if err != nil {
rResp.ErrCode = 100 rResp.ErrCode = 100
@ -54,32 +55,67 @@ func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.S
log.Error(req.OperationID, "SetMessageReactionExpire err:", err.Error(), req.String()) log.Error(req.OperationID, "SetMessageReactionExpire err:", err.Error(), req.String())
} }
} else { } else {
err := rpc.dMessageLocker.LockGlobalMessage(req.ClientMsgID)
if err != nil {
rResp.ErrCode = 100
rResp.ErrMsg = err.Error()
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime)
if err != nil {
rResp.ErrCode = 200
rResp.ErrMsg = err.Error()
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
setValue := make(map[string]*server_api_params.KeyValue)
for k, v := range req.ReactionExtensionList { for k, v := range req.ReactionExtensionList {
err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k)
if err != nil {
setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
continue
}
redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k)
if err != nil && err != go_redis.Nil {
setKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, k, v)
continue
}
temp := new(server_api_params.KeyValue) temp := new(server_api_params.KeyValue)
utils.JsonStringToStruct(redisValue, temp) if vv, ok := mongoValue.ReactionExtensionList[k]; ok {
if v.LatestUpdateTime != temp.LatestUpdateTime { utils.CopyStructFields(temp, &vv)
setKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, k, temp) if v.LatestUpdateTime != vv.LatestUpdateTime {
continue setKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, k, temp)
} else {
v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v))
if newerr != nil {
setKeyResultInfo(&rResp, 201, newerr.Error(), req.ClientMsgID, k, temp)
continue continue
} }
setKeyResultInfo(&rResp, 0, "", req.ClientMsgID, k, v)
} }
temp.TypeKey = k
temp.Value = v.Value
temp.LatestUpdateTime = utils.GetCurrentTimestampByMill()
setValue[k] = temp
}
err = db.DB.InsertOrUpdateReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue)
if err != nil {
for _, value := range setValue {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
} else {
for _, value := range setValue {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
rResp.Result = append(rResp.Result, temp)
}
}
lockErr := rpc.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID)
if lockErr != nil {
log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error())
} }
} }
@ -114,10 +150,14 @@ func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.S
} }
} }
if !isExists && !req.IsReact { if !isExists {
ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, true) if !req.IsReact {
ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, true, true)
} else {
ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, false)
}
} else { } else {
ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false) ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, true)
} }
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String()) log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String())
return &rResp, nil return &rResp, nil
@ -171,7 +211,23 @@ func (rpc *rpcChat) GetMessageListReactionExtensions(ctx context.Context, req *m
oneMessage.ReactionExtensionList = keyMap oneMessage.ReactionExtensionList = keyMap
} else { } else {
mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, messageValue.ClientMsgID, messageValue.MsgFirstModifyTime)
if err != nil {
oneMessage.ErrCode = 100
oneMessage.ErrMsg = err.Error()
rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
continue
}
keyMap := make(map[string]*server_api_params.KeyValue)
for k, v := range mongoValue.ReactionExtensionList {
temp := new(server_api_params.KeyValue)
temp.TypeKey = v.TypeKey
temp.Value = v.Value
temp.LatestUpdateTime = v.LatestUpdateTime
keyMap[k] = temp
}
oneMessage.ReactionExtensionList = keyMap
} }
rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage) rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
} }
@ -230,9 +286,72 @@ func (rpc *rpcChat) DeleteMessageReactionExtensions(ctx context.Context, req *ms
} }
} }
} else { } else {
err := rpc.dMessageLocker.LockGlobalMessage(req.ClientMsgID)
if err != nil {
rResp.ErrCode = 100
rResp.ErrMsg = err.Error()
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime)
if err != nil {
rResp.ErrCode = 200
rResp.ErrMsg = err.Error()
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
setValue := make(map[string]*server_api_params.KeyValue)
for _, v := range req.ReactionExtensionList {
temp := new(server_api_params.KeyValue)
if vv, ok := mongoValue.ReactionExtensionList[v.TypeKey]; ok {
utils.CopyStructFields(temp, &vv)
if v.LatestUpdateTime != vv.LatestUpdateTime {
setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp)
continue
}
} else {
setDeleteKeyResultInfo(&rResp, 400, "key not in", req.ClientMsgID, v.TypeKey, v)
continue
}
temp.TypeKey = v.TypeKey
setValue[v.TypeKey] = temp
}
err = db.DB.DeleteReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue)
if err != nil {
for _, value := range setValue {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
} else {
for _, value := range setValue {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
rResp.Result = append(rResp.Result, temp)
}
}
lockErr := rpc.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID)
if lockErr != nil {
log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error())
}
} }
ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false) ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, isExists)
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String()) log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String())
return &rResp, nil return &rResp, nil
} }

View File

@ -14,7 +14,7 @@ import (
) )
func ExtendMessageUpdatedNotification(operationID, sendID string, sourceID string, sessionType int32, func ExtendMessageUpdatedNotification(operationID, sendID string, sourceID string, sessionType int32,
req *msg.SetMessageReactionExtensionsReq, resp *msg.SetMessageReactionExtensionsResp, isHistory bool) { req *msg.SetMessageReactionExtensionsReq, resp *msg.SetMessageReactionExtensionsResp, isHistory bool, isReactionFromCache bool) {
var m base_info.ReactionMessageModifierNotification var m base_info.ReactionMessageModifierNotification
m.SourceID = req.SourceID m.SourceID = req.SourceID
m.OpUserID = req.OpUserID m.OpUserID = req.OpUserID
@ -30,10 +30,10 @@ func ExtendMessageUpdatedNotification(operationID, sendID string, sourceID strin
m.IsReact = resp.IsReact m.IsReact = resp.IsReact
m.IsExternalExtensions = req.IsExternalExtensions m.IsExternalExtensions = req.IsExternalExtensions
m.MsgFirstModifyTime = resp.MsgFirstModifyTime m.MsgFirstModifyTime = resp.MsgFirstModifyTime
messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageModifier, utils.StructToJsonString(m), isHistory) messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageModifier, utils.StructToJsonString(m), isHistory, isReactionFromCache)
} }
func ExtendMessageDeleteNotification(operationID, sendID string, sourceID string, sessionType int32, func ExtendMessageDeleteNotification(operationID, sendID string, sourceID string, sessionType int32,
req *msg.DeleteMessageListReactionExtensionsReq, resp *msg.DeleteMessageListReactionExtensionsResp, isHistory bool) { req *msg.DeleteMessageListReactionExtensionsReq, resp *msg.DeleteMessageListReactionExtensionsResp, isHistory bool, isReactionFromCache bool) {
var m base_info.ReactionMessageDeleteNotification var m base_info.ReactionMessageDeleteNotification
m.SourceID = req.SourceID m.SourceID = req.SourceID
m.OpUserID = req.OpUserID m.OpUserID = req.OpUserID
@ -48,14 +48,15 @@ func ExtendMessageDeleteNotification(operationID, sendID string, sourceID string
m.ClientMsgID = req.ClientMsgID m.ClientMsgID = req.ClientMsgID
m.MsgFirstModifyTime = req.MsgFirstModifyTime m.MsgFirstModifyTime = req.MsgFirstModifyTime
messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageDeleter, utils.StructToJsonString(m), isHistory) messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageDeleter, utils.StructToJsonString(m), isHistory, isReactionFromCache)
} }
func messageReactionSender(operationID, sendID string, sourceID string, sessionType, contentType int32, content string, isHistory bool) { func messageReactionSender(operationID, sendID string, sourceID string, sessionType, contentType int32, content string, isHistory bool, isReactionFromCache bool) {
options := make(map[string]bool, 5) options := make(map[string]bool, 5)
utils.SetSwitchFromOptions(options, constant.IsOfflinePush, false) utils.SetSwitchFromOptions(options, constant.IsOfflinePush, false)
utils.SetSwitchFromOptions(options, constant.IsConversationUpdate, false) utils.SetSwitchFromOptions(options, constant.IsConversationUpdate, false)
utils.SetSwitchFromOptions(options, constant.IsSenderConversationUpdate, false) utils.SetSwitchFromOptions(options, constant.IsSenderConversationUpdate, false)
utils.SetSwitchFromOptions(options, constant.IsUnreadCount, false) utils.SetSwitchFromOptions(options, constant.IsUnreadCount, false)
utils.SetSwitchFromOptions(options, constant.IsReactionFromCache, isReactionFromCache)
if !isHistory { if !isHistory {
utils.SetSwitchFromOptions(options, constant.IsHistory, false) utils.SetSwitchFromOptions(options, constant.IsHistory, false)
utils.SetSwitchFromOptions(options, constant.IsPersistent, false) utils.SetSwitchFromOptions(options, constant.IsPersistent, false)

View File

@ -5,9 +5,13 @@ import (
"time" "time"
) )
const GlOBLLOCK = "GLOBAL_LOCK"
type MessageLocker interface { type MessageLocker interface {
LockMessageTypeKey(clientMsgID, typeKey string) (err error) LockMessageTypeKey(clientMsgID, typeKey string) (err error)
UnLockMessageTypeKey(clientMsgID string, typeKey string) error UnLockMessageTypeKey(clientMsgID string, typeKey string) error
LockGlobalMessage(clientMsgID string) (err error)
UnLockGlobalMessage(clientMsgID string) (err error)
} }
type LockerMessage struct{} type LockerMessage struct{}
@ -26,7 +30,23 @@ func (l *LockerMessage) LockMessageTypeKey(clientMsgID, typeKey string) (err err
} }
return err return err
}
func (l *LockerMessage) LockGlobalMessage(clientMsgID string) (err error) {
for i := 0; i < 3; i++ {
err = db.DB.LockMessageTypeKey(clientMsgID, GlOBLLOCK)
if err != nil {
time.Sleep(time.Millisecond * 100)
continue
} else {
break
}
}
return err
} }
func (l *LockerMessage) UnLockMessageTypeKey(clientMsgID string, typeKey string) error { func (l *LockerMessage) UnLockMessageTypeKey(clientMsgID string, typeKey string) error {
return db.DB.UnLockMessageTypeKey(clientMsgID, typeKey) return db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
} }
func (l *LockerMessage) UnLockGlobalMessage(clientMsgID string) error {
return db.DB.UnLockMessageTypeKey(clientMsgID, GlOBLLOCK)
}

View File

@ -171,6 +171,7 @@ const (
IsNotPrivate = "notPrivate" IsNotPrivate = "notPrivate"
IsSenderConversationUpdate = "senderConversationUpdate" IsSenderConversationUpdate = "senderConversationUpdate"
IsSenderNotificationPush = "senderNotificationPush" IsSenderNotificationPush = "senderNotificationPush"
IsReactionFromCache = "reactionFromCache"
//GroupStatus //GroupStatus
GroupOk = 0 GroupOk = 0