mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-05-04 01:01:09 +08:00
reaction message
This commit is contained in:
parent
6d45f0730c
commit
6f20cdd905
@ -1,16 +1,184 @@
|
||||
package msg
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/common/db"
|
||||
"Open_IM/pkg/proto/msg"
|
||||
"Open_IM/pkg/proto/sdk_ws"
|
||||
"Open_IM/pkg/utils"
|
||||
"context"
|
||||
go_redis "github.com/go-redis/redis/v8"
|
||||
|
||||
"time"
|
||||
)
|
||||
|
||||
func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.ModifyMessageReactionExtensionsReq) (resp *msg.ModifyMessageReactionExtensionsResp, err error) {
|
||||
return
|
||||
var rResp msg.ModifyMessageReactionExtensionsResp
|
||||
var extendMsgResp msg.ExtendMsgResp
|
||||
var failedExtendMsgResp msg.ExtendMsgResp
|
||||
var oneExtendMsg msg.ExtendMsg
|
||||
var failedExtendMsg msg.ExtendMsg
|
||||
oneExtendMsg.ClientMsgID = req.ClientMsgID
|
||||
oneExtendMsg.MsgFirstModifyTime = req.MsgFirstModifyTime
|
||||
oneFailedReactionExtensionList := make(map[string]*msg.KeyValueResp)
|
||||
oneSuccessReactionExtensionList := make(map[string]*msg.KeyValueResp)
|
||||
isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType)
|
||||
if err != nil {
|
||||
extendMsgResp.ErrCode = 100
|
||||
extendMsgResp.ErrMsg = err.Error()
|
||||
for k, value := range req.ReactionExtensionList {
|
||||
temp := new(msg.KeyValueResp)
|
||||
temp.KeyValue = value
|
||||
temp.ErrMsg = err.Error()
|
||||
temp.ErrCode = 100
|
||||
oneFailedReactionExtensionList[k] = temp
|
||||
}
|
||||
oneExtendMsg.ReactionExtensionList = oneFailedReactionExtensionList
|
||||
extendMsgResp.ExtendMsg = &oneExtendMsg
|
||||
rResp.FailedList = append(rResp.FailedList, &extendMsgResp)
|
||||
return &rResp, nil
|
||||
}
|
||||
|
||||
if !isExists {
|
||||
if !req.IsReact {
|
||||
oneExtendMsg.MsgFirstModifyTime = utils.GetCurrentTimestampByMill()
|
||||
//redis处理
|
||||
for k, v := range req.ReactionExtensionList {
|
||||
//抢占分布式锁
|
||||
err := lockMessageTypeKey(req.ClientMsgID, k)
|
||||
if err != nil {
|
||||
setKeyResultInfo(oneFailedReactionExtensionList, 100, err.Error(), req.ClientMsgID, k, v)
|
||||
continue
|
||||
}
|
||||
redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k)
|
||||
if err != nil && err != go_redis.Nil {
|
||||
setKeyResultInfo(oneFailedReactionExtensionList, 200, err.Error(), req.ClientMsgID, k, v)
|
||||
continue
|
||||
}
|
||||
temp := new(server_api_params.KeyValue)
|
||||
utils.JsonStringToStruct(redisValue, temp)
|
||||
if v.LatestUpdateTime != temp.LatestUpdateTime {
|
||||
setKeyResultInfo(oneFailedReactionExtensionList, 300, "message have update", req.ClientMsgID, k, temp)
|
||||
continue
|
||||
} else {
|
||||
v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
||||
newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v))
|
||||
if newerr != nil {
|
||||
setKeyResultInfo(oneFailedReactionExtensionList, 201, newerr.Error(), req.ClientMsgID, k, temp)
|
||||
continue
|
||||
}
|
||||
setKeyResultInfo(oneSuccessReactionExtensionList, 0, "", req.ClientMsgID, k, v)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
} else {
|
||||
//mongo处理
|
||||
}
|
||||
|
||||
} else {
|
||||
for k, v := range req.ReactionExtensionList {
|
||||
//抢占分布式锁
|
||||
err := lockMessageTypeKey(req.ClientMsgID, k)
|
||||
if err != nil {
|
||||
setKeyResultInfo(oneFailedReactionExtensionList, 100, err.Error(), req.ClientMsgID, k, v)
|
||||
continue
|
||||
}
|
||||
redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k)
|
||||
if err != nil && err != go_redis.Nil {
|
||||
setKeyResultInfo(oneFailedReactionExtensionList, 200, err.Error(), req.ClientMsgID, k, v)
|
||||
continue
|
||||
}
|
||||
temp := new(server_api_params.KeyValue)
|
||||
utils.JsonStringToStruct(redisValue, temp)
|
||||
if v.LatestUpdateTime != temp.LatestUpdateTime {
|
||||
setKeyResultInfo(oneFailedReactionExtensionList, 300, "message have update", req.ClientMsgID, k, temp)
|
||||
continue
|
||||
} else {
|
||||
v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
||||
newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v))
|
||||
if newerr != nil {
|
||||
setKeyResultInfo(oneFailedReactionExtensionList, 201, newerr.Error(), req.ClientMsgID, k, temp)
|
||||
continue
|
||||
}
|
||||
setKeyResultInfo(oneSuccessReactionExtensionList, 0, "", req.ClientMsgID, k, v)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
oneExtendMsg.ReactionExtensionList = oneSuccessReactionExtensionList
|
||||
extendMsgResp.ExtendMsg = &oneExtendMsg
|
||||
failedExtendMsg.ReactionExtensionList = oneFailedReactionExtensionList
|
||||
failedExtendMsgResp.ExtendMsg = &failedExtendMsg
|
||||
rResp.FailedList = append(rResp.FailedList, &failedExtendMsgResp)
|
||||
rResp.SuccessList = append(rResp.FailedList, &extendMsgResp)
|
||||
return &rResp, nil
|
||||
|
||||
}
|
||||
func setKeyResultInfo(m map[string]*msg.KeyValueResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *server_api_params.KeyValue) {
|
||||
temp := new(msg.KeyValueResp)
|
||||
temp.KeyValue = keyValue
|
||||
temp.ErrCode = errCode
|
||||
temp.ErrMsg = errMsg
|
||||
m[typeKey] = temp
|
||||
_ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
|
||||
}
|
||||
|
||||
func (rpc *rpcChat) GetMessageListReactionExtensions(ctx context.Context, req *msg.OperateMessageListReactionExtensionsReq) (resp *msg.OperateMessageListReactionExtensionsResp, err error) {
|
||||
//for _, messageValue := range req.MessageReactionKeyList {
|
||||
// isExists, err := db.DB.JudgeMessageReactionEXISTS(messageValue.ClientMsgID,req.SessionType)
|
||||
// if err != nil {
|
||||
//
|
||||
// }
|
||||
// var failedList []*msg.ExtendMsgResp
|
||||
// var successList []*msg.ExtendMsgResp
|
||||
// var oneExtendMsg msg.ExtendMsg
|
||||
// oneExtendMsg.ClientMsgID = req.ClientMsgID
|
||||
// oneFailedReactionExtensionList:=make(map[string]*msg.KeyValueResp)
|
||||
// oneSuccessReactionExtensionList:=make(map[string]*msg.KeyValueResp)
|
||||
// if !isExists {
|
||||
// if !req.IsReact {
|
||||
// oneExtendMsg.MsgFirstModifyTime = utils.GetCurrentTimestampByMill()
|
||||
// //redis处理
|
||||
// for k, v := range req.ReactionExtensionList {
|
||||
// //抢占分布式锁
|
||||
// err:=lockMessageTypeKey(req.ClientMsgID,k)
|
||||
// if err != nil {
|
||||
// setKeyResultInfo(oneFailedReactionExtensionList,100,err.Error(),req.ClientMsgID,k,v)
|
||||
// continue
|
||||
// }
|
||||
// redisValue,err:=db.DB.GetMessageTypeKeyValue(req.ClientMsgID,req.SessionType,k)
|
||||
// if err != nil&&err!=go_redis.Nil {
|
||||
// setKeyResultInfo(oneFailedReactionExtensionList,200,err.Error(),req.ClientMsgID,k,v)
|
||||
// continue
|
||||
// }
|
||||
// temp:=new(server_api_params.KeyValue)
|
||||
// utils.JsonStringToStruct(redisValue,temp)
|
||||
// if v.LatestUpdateTime != temp.LatestUpdateTime {
|
||||
// setKeyResultInfo(oneFailedReactionExtensionList,300,"message have update",req.ClientMsgID,k,temp)
|
||||
// continue
|
||||
// }else{
|
||||
// v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
||||
// newerr:=db.DB.SetMessageTypeKeyValue(req.ClientMsgID,req.SessionType,k,utils.StructToJsonString(v))
|
||||
// if newerr != nil {
|
||||
// setKeyResultInfo(oneFailedReactionExtensionList,201,newerr.Error(),req.ClientMsgID,k,temp)
|
||||
// continue
|
||||
// }
|
||||
// setKeyResultInfo(oneSuccessReactionExtensionList,0,"",req.ClientMsgID,k,v)
|
||||
// }
|
||||
//
|
||||
// }
|
||||
//
|
||||
// }else{
|
||||
// //mongo处理
|
||||
// }
|
||||
//
|
||||
// }else{
|
||||
//
|
||||
// }
|
||||
// return
|
||||
//}
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
func (rpc *rpcChat) AddMessageReactionExtensions(ctx context.Context, req *msg.ModifyMessageReactionExtensionsReq) (resp *msg.ModifyMessageReactionExtensionsResp, err error) {
|
||||
@ -20,3 +188,16 @@ func (rpc *rpcChat) AddMessageReactionExtensions(ctx context.Context, req *msg.M
|
||||
func (rpc *rpcChat) DeleteMessageReactionExtensions(ctx context.Context, req *msg.OperateMessageListReactionExtensionsReq) (resp *msg.OperateMessageListReactionExtensionsResp, err error) {
|
||||
return
|
||||
}
|
||||
func lockMessageTypeKey(clientMsgID, typeKey string) (err error) {
|
||||
for i := 0; i < 3; i++ {
|
||||
err = db.DB.LockMessageTypeKey(clientMsgID, typeKey)
|
||||
if err != nil {
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
return err
|
||||
|
||||
}
|
||||
|
13
internal/rpc/msg/extend_msg.notification.go
Normal file
13
internal/rpc/msg/extend_msg.notification.go
Normal file
@ -0,0 +1,13 @@
|
||||
package msg
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/common/constant"
|
||||
pbFriend "Open_IM/pkg/proto/friend"
|
||||
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
|
||||
)
|
||||
|
||||
func ExtendMessageUpdatedNotification(operationID, changedUserID string, needNotifiedUserID string, opUserID string) {
|
||||
selfInfoUpdatedTips := open_im_sdk.UserInfoUpdatedTips{UserID: changedUserID}
|
||||
commID := pbFriend.CommID{FromUserID: opUserID, ToUserID: needNotifiedUserID, OpUserID: opUserID, OperationID: operationID}
|
||||
friendNotification(&commID, constant.FriendInfoUpdatedNotification, &selfInfoUpdatedTips)
|
||||
}
|
@ -38,6 +38,7 @@ const (
|
||||
groupMinSeq = "GROUP_MIN_SEQ:"
|
||||
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
|
||||
userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
|
||||
exTypeKeyLocker = "EX_LOCK:"
|
||||
)
|
||||
|
||||
func (d *DataBases) JudgeAccountEXISTS(account string) (bool, error) {
|
||||
@ -437,3 +438,46 @@ func (d *DataBases) GetUserBadgeUnreadCountSum(uid string) (int, error) {
|
||||
seq, err := d.RDB.Get(context.Background(), key).Result()
|
||||
return utils.StringToInt(seq), err
|
||||
}
|
||||
func (d *DataBases) JudgeMessageReactionEXISTS(clientMsgID string, sessionType int32) (bool, error) {
|
||||
key := getMessageReactionExPrefix(clientMsgID, sessionType)
|
||||
n, err := d.RDB.Exists(context.Background(), key).Result()
|
||||
if n > 0 {
|
||||
return true, err
|
||||
} else {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
func (d *DataBases) GetMessageTypeKeyValue(clientMsgID string, sessionType int32, typeKey string) (string, error) {
|
||||
key := getMessageReactionExPrefix(clientMsgID, sessionType)
|
||||
result, err := d.RDB.HGet(context.Background(), key, typeKey).Result()
|
||||
return result, err
|
||||
|
||||
}
|
||||
func (d *DataBases) SetMessageTypeKeyValue(clientMsgID string, sessionType int32, typeKey, value string) error {
|
||||
key := getMessageReactionExPrefix(clientMsgID, sessionType)
|
||||
return d.RDB.HSet(context.Background(), key, typeKey, value).Err()
|
||||
|
||||
}
|
||||
func (d *DataBases) LockMessageTypeKey(clientMsgID string, TypeKey string) error {
|
||||
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
|
||||
return d.RDB.SetNX(context.Background(), key, 1, time.Minute).Err()
|
||||
}
|
||||
func (d *DataBases) UnLockMessageTypeKey(clientMsgID string, TypeKey string) error {
|
||||
key := exTypeKeyLocker + clientMsgID + "_" + TypeKey
|
||||
return d.RDB.Del(context.Background(), key).Err()
|
||||
|
||||
}
|
||||
|
||||
func getMessageReactionExPrefix(clientMsgID string, sessionType int32) string {
|
||||
switch sessionType {
|
||||
case constant.SingleChatType:
|
||||
return "EX_SINGLE_" + clientMsgID
|
||||
case constant.GroupChatType:
|
||||
return "EX_GROUP_" + clientMsgID
|
||||
case constant.SuperGroupChatType:
|
||||
return "EX_SUPER_GROUP_" + clientMsgID
|
||||
case constant.NotificationChatType:
|
||||
return "EX_NOTIFICATION" + clientMsgID
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user