mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-08-10 13:09:51 +08:00
reaction message add expiration
This commit is contained in:
parent
a662414b7f
commit
915118140f
@ -34,9 +34,7 @@ func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.S
|
|||||||
if !req.IsReact {
|
if !req.IsReact {
|
||||||
log.Debug(req.OperationID, "redis handle firstly", req.String())
|
log.Debug(req.OperationID, "redis handle firstly", req.String())
|
||||||
rResp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill()
|
rResp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill()
|
||||||
//redis处理
|
|
||||||
for k, v := range req.ReactionExtensionList {
|
for k, v := range req.ReactionExtensionList {
|
||||||
//抢占分布式锁
|
|
||||||
err := lockMessageTypeKey(req.ClientMsgID, k)
|
err := lockMessageTypeKey(req.ClientMsgID, k)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
|
setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
|
||||||
@ -63,7 +61,6 @@ func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.S
|
|||||||
log.Debug(req.OperationID, "redis handle secondly", req.String())
|
log.Debug(req.OperationID, "redis handle secondly", req.String())
|
||||||
|
|
||||||
for k, v := range req.ReactionExtensionList {
|
for k, v := range req.ReactionExtensionList {
|
||||||
//抢占分布式锁
|
|
||||||
err := lockMessageTypeKey(req.ClientMsgID, k)
|
err := lockMessageTypeKey(req.ClientMsgID, k)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
|
setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
|
||||||
@ -108,8 +105,17 @@ func setKeyResultInfo(r *msg.SetMessageReactionExtensionsResp, errCode int32, er
|
|||||||
r.Result = append(r.Result, temp)
|
r.Result = append(r.Result, temp)
|
||||||
_ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
|
_ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
|
||||||
}
|
}
|
||||||
|
func setDeleteKeyResultInfo(r *msg.DeleteMessageListReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *server_api_params.KeyValue) {
|
||||||
|
temp := new(msg.KeyValueResp)
|
||||||
|
temp.KeyValue = keyValue
|
||||||
|
temp.ErrCode = errCode
|
||||||
|
temp.ErrMsg = errMsg
|
||||||
|
r.Result = append(r.Result, temp)
|
||||||
|
_ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
|
||||||
|
}
|
||||||
|
|
||||||
func (rpc *rpcChat) GetMessageListReactionExtensions(ctx context.Context, req *msg.GetMessageListReactionExtensionsReq) (resp *msg.GetMessageListReactionExtensionsResp, err error) {
|
func (rpc *rpcChat) GetMessageListReactionExtensions(ctx context.Context, req *msg.GetMessageListReactionExtensionsReq) (resp *msg.GetMessageListReactionExtensionsResp, err error) {
|
||||||
|
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String())
|
||||||
var rResp msg.GetMessageListReactionExtensionsResp
|
var rResp msg.GetMessageListReactionExtensionsResp
|
||||||
for _, messageValue := range req.MessageReactionKeyList {
|
for _, messageValue := range req.MessageReactionKeyList {
|
||||||
var oneMessage msg.SingleMessageExtensionResult
|
var oneMessage msg.SingleMessageExtensionResult
|
||||||
@ -143,6 +149,7 @@ func (rpc *rpcChat) GetMessageListReactionExtensions(ctx context.Context, req *m
|
|||||||
}
|
}
|
||||||
rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
|
rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
|
||||||
}
|
}
|
||||||
|
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String())
|
||||||
return &rResp, nil
|
return &rResp, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -152,7 +159,56 @@ func (rpc *rpcChat) AddMessageReactionExtensions(ctx context.Context, req *msg.M
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (rpc *rpcChat) DeleteMessageReactionExtensions(ctx context.Context, req *msg.DeleteMessageListReactionExtensionsReq) (resp *msg.DeleteMessageListReactionExtensionsResp, err error) {
|
func (rpc *rpcChat) DeleteMessageReactionExtensions(ctx context.Context, req *msg.DeleteMessageListReactionExtensionsReq) (resp *msg.DeleteMessageListReactionExtensionsResp, err error) {
|
||||||
return
|
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String())
|
||||||
|
var rResp msg.DeleteMessageListReactionExtensionsResp
|
||||||
|
isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType)
|
||||||
|
if err != nil {
|
||||||
|
rResp.ErrCode = 100
|
||||||
|
rResp.ErrMsg = err.Error()
|
||||||
|
for _, value := range req.ReactionExtensionList {
|
||||||
|
temp := new(msg.KeyValueResp)
|
||||||
|
temp.KeyValue = value
|
||||||
|
temp.ErrMsg = err.Error()
|
||||||
|
temp.ErrCode = 100
|
||||||
|
rResp.Result = append(rResp.Result, temp)
|
||||||
|
}
|
||||||
|
return &rResp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if isExists {
|
||||||
|
log.Debug(req.OperationID, "redis handle this delete", req.String())
|
||||||
|
for _, v := range req.ReactionExtensionList {
|
||||||
|
err := lockMessageTypeKey(req.ClientMsgID, v.TypeKey)
|
||||||
|
if err != nil {
|
||||||
|
setDeleteKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, v.TypeKey, v)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, v.TypeKey)
|
||||||
|
if err != nil && err != go_redis.Nil {
|
||||||
|
setDeleteKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, v.TypeKey, v)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
temp := new(server_api_params.KeyValue)
|
||||||
|
utils.JsonStringToStruct(redisValue, temp)
|
||||||
|
if v.LatestUpdateTime != temp.LatestUpdateTime {
|
||||||
|
setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp)
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
newErr := db.DB.DeleteOneMessageKey(req.ClientMsgID, req.SessionType, v.TypeKey)
|
||||||
|
if newErr != nil {
|
||||||
|
setDeleteKeyResultInfo(&rResp, 201, newErr.Error(), req.ClientMsgID, v.TypeKey, temp)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
setDeleteKeyResultInfo(&rResp, 0, "", req.ClientMsgID, v.TypeKey, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
|
||||||
|
}
|
||||||
|
ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false)
|
||||||
|
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String())
|
||||||
|
return &rResp, nil
|
||||||
}
|
}
|
||||||
func lockMessageTypeKey(clientMsgID, typeKey string) (err error) {
|
func lockMessageTypeKey(clientMsgID, typeKey string) (err error) {
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
|
@ -32,6 +32,24 @@ func ExtendMessageUpdatedNotification(operationID, sendID string, sourceID strin
|
|||||||
m.MsgFirstModifyTime = resp.MsgFirstModifyTime
|
m.MsgFirstModifyTime = resp.MsgFirstModifyTime
|
||||||
messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageModifier, utils.StructToJsonString(m), isHistory)
|
messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageModifier, utils.StructToJsonString(m), isHistory)
|
||||||
}
|
}
|
||||||
|
func ExtendMessageDeleteNotification(operationID, sendID string, sourceID string, sessionType int32,
|
||||||
|
req *msg.DeleteMessageListReactionExtensionsReq, resp *msg.DeleteMessageListReactionExtensionsResp, isHistory bool) {
|
||||||
|
var m base_info.ReactionMessageDeleteNotification
|
||||||
|
m.SourceID = req.SourceID
|
||||||
|
m.OpUserID = req.OpUserID
|
||||||
|
m.SessionType = req.SessionType
|
||||||
|
keyMap := make(map[string]*open_im_sdk.KeyValue)
|
||||||
|
for _, valueResp := range resp.Result {
|
||||||
|
if valueResp.ErrCode == 0 {
|
||||||
|
keyMap[valueResp.KeyValue.TypeKey] = valueResp.KeyValue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
m.SuccessReactionExtensionList = keyMap
|
||||||
|
m.ClientMsgID = req.ClientMsgID
|
||||||
|
m.MsgFirstModifyTime = req.MsgFirstModifyTime
|
||||||
|
|
||||||
|
messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageDeleter, utils.StructToJsonString(m), isHistory)
|
||||||
|
}
|
||||||
func messageReactionSender(operationID, sendID string, sourceID string, sessionType, contentType int32, content string, isHistory bool) {
|
func messageReactionSender(operationID, sendID string, sourceID string, sessionType, contentType int32, content string, isHistory bool) {
|
||||||
options := make(map[string]bool, 5)
|
options := make(map[string]bool, 5)
|
||||||
utils.SetSwitchFromOptions(options, constant.IsOfflinePush, false)
|
utils.SetSwitchFromOptions(options, constant.IsOfflinePush, false)
|
||||||
|
@ -452,6 +452,11 @@ func (d *DataBases) GetOneMessageAllReactionList(clientMsgID string, sessionType
|
|||||||
key := getMessageReactionExPrefix(clientMsgID, sessionType)
|
key := getMessageReactionExPrefix(clientMsgID, sessionType)
|
||||||
return d.RDB.HGetAll(context.Background(), key).Result()
|
return d.RDB.HGetAll(context.Background(), key).Result()
|
||||||
|
|
||||||
|
}
|
||||||
|
func (d *DataBases) DeleteOneMessageKey(clientMsgID string, sessionType int32, subKey string) error {
|
||||||
|
key := getMessageReactionExPrefix(clientMsgID, sessionType)
|
||||||
|
return d.RDB.HDel(context.Background(), key, subKey).Err()
|
||||||
|
|
||||||
}
|
}
|
||||||
func (d *DataBases) SetMessageReactionExpire(clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
|
func (d *DataBases) SetMessageReactionExpire(clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
|
||||||
key := getMessageReactionExPrefix(clientMsgID, sessionType)
|
key := getMessageReactionExPrefix(clientMsgID, sessionType)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user