diff --git a/internal/rpc/msg/extend_msg.go b/internal/rpc/msg/extend_msg.go index 682485c50..29181ce5d 100644 --- a/internal/rpc/msg/extend_msg.go +++ b/internal/rpc/msg/extend_msg.go @@ -1,16 +1,184 @@ package msg import ( + "Open_IM/pkg/common/db" "Open_IM/pkg/proto/msg" + "Open_IM/pkg/proto/sdk_ws" + "Open_IM/pkg/utils" "context" + go_redis "github.com/go-redis/redis/v8" + + "time" ) func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.ModifyMessageReactionExtensionsReq) (resp *msg.ModifyMessageReactionExtensionsResp, err error) { - return + var rResp msg.ModifyMessageReactionExtensionsResp + var extendMsgResp msg.ExtendMsgResp + var failedExtendMsgResp msg.ExtendMsgResp + var oneExtendMsg msg.ExtendMsg + var failedExtendMsg msg.ExtendMsg + oneExtendMsg.ClientMsgID = req.ClientMsgID + oneExtendMsg.MsgFirstModifyTime = req.MsgFirstModifyTime + oneFailedReactionExtensionList := make(map[string]*msg.KeyValueResp) + oneSuccessReactionExtensionList := make(map[string]*msg.KeyValueResp) + isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType) + if err != nil { + extendMsgResp.ErrCode = 100 + extendMsgResp.ErrMsg = err.Error() + for k, value := range req.ReactionExtensionList { + temp := new(msg.KeyValueResp) + temp.KeyValue = value + temp.ErrMsg = err.Error() + temp.ErrCode = 100 + oneFailedReactionExtensionList[k] = temp + } + oneExtendMsg.ReactionExtensionList = oneFailedReactionExtensionList + extendMsgResp.ExtendMsg = &oneExtendMsg + rResp.FailedList = append(rResp.FailedList, &extendMsgResp) + return &rResp, nil + } + + if !isExists { + if !req.IsReact { + oneExtendMsg.MsgFirstModifyTime = utils.GetCurrentTimestampByMill() + //redis处理 + for k, v := range req.ReactionExtensionList { + //抢占分布式锁 + err := lockMessageTypeKey(req.ClientMsgID, k) + if err != nil { + setKeyResultInfo(oneFailedReactionExtensionList, 100, err.Error(), req.ClientMsgID, k, v) + continue + } + redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k) + if err != nil && err != go_redis.Nil { + setKeyResultInfo(oneFailedReactionExtensionList, 200, err.Error(), req.ClientMsgID, k, v) + continue + } + temp := new(server_api_params.KeyValue) + utils.JsonStringToStruct(redisValue, temp) + if v.LatestUpdateTime != temp.LatestUpdateTime { + setKeyResultInfo(oneFailedReactionExtensionList, 300, "message have update", req.ClientMsgID, k, temp) + continue + } else { + v.LatestUpdateTime = utils.GetCurrentTimestampByMill() + newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)) + if newerr != nil { + setKeyResultInfo(oneFailedReactionExtensionList, 201, newerr.Error(), req.ClientMsgID, k, temp) + continue + } + setKeyResultInfo(oneSuccessReactionExtensionList, 0, "", req.ClientMsgID, k, v) + } + + } + + } else { + //mongo处理 + } + + } else { + for k, v := range req.ReactionExtensionList { + //抢占分布式锁 + err := lockMessageTypeKey(req.ClientMsgID, k) + if err != nil { + setKeyResultInfo(oneFailedReactionExtensionList, 100, err.Error(), req.ClientMsgID, k, v) + continue + } + redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k) + if err != nil && err != go_redis.Nil { + setKeyResultInfo(oneFailedReactionExtensionList, 200, err.Error(), req.ClientMsgID, k, v) + continue + } + temp := new(server_api_params.KeyValue) + utils.JsonStringToStruct(redisValue, temp) + if v.LatestUpdateTime != temp.LatestUpdateTime { + setKeyResultInfo(oneFailedReactionExtensionList, 300, "message have update", req.ClientMsgID, k, temp) + continue + } else { + v.LatestUpdateTime = utils.GetCurrentTimestampByMill() + newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)) + if newerr != nil { + setKeyResultInfo(oneFailedReactionExtensionList, 201, newerr.Error(), req.ClientMsgID, k, temp) + continue + } + setKeyResultInfo(oneSuccessReactionExtensionList, 0, "", req.ClientMsgID, k, v) + } + + } + } + oneExtendMsg.ReactionExtensionList = oneSuccessReactionExtensionList + extendMsgResp.ExtendMsg = &oneExtendMsg + failedExtendMsg.ReactionExtensionList = oneFailedReactionExtensionList + failedExtendMsgResp.ExtendMsg = &failedExtendMsg + rResp.FailedList = append(rResp.FailedList, &failedExtendMsgResp) + rResp.SuccessList = append(rResp.FailedList, &extendMsgResp) + return &rResp, nil + +} +func setKeyResultInfo(m map[string]*msg.KeyValueResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *server_api_params.KeyValue) { + temp := new(msg.KeyValueResp) + temp.KeyValue = keyValue + temp.ErrCode = errCode + temp.ErrMsg = errMsg + m[typeKey] = temp + _ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey) } func (rpc *rpcChat) GetMessageListReactionExtensions(ctx context.Context, req *msg.OperateMessageListReactionExtensionsReq) (resp *msg.OperateMessageListReactionExtensionsResp, err error) { + //for _, messageValue := range req.MessageReactionKeyList { + // isExists, err := db.DB.JudgeMessageReactionEXISTS(messageValue.ClientMsgID,req.SessionType) + // if err != nil { + // + // } + // var failedList []*msg.ExtendMsgResp + // var successList []*msg.ExtendMsgResp + // var oneExtendMsg msg.ExtendMsg + // oneExtendMsg.ClientMsgID = req.ClientMsgID + // oneFailedReactionExtensionList:=make(map[string]*msg.KeyValueResp) + // oneSuccessReactionExtensionList:=make(map[string]*msg.KeyValueResp) + // if !isExists { + // if !req.IsReact { + // oneExtendMsg.MsgFirstModifyTime = utils.GetCurrentTimestampByMill() + // //redis处理 + // for k, v := range req.ReactionExtensionList { + // //抢占分布式锁 + // err:=lockMessageTypeKey(req.ClientMsgID,k) + // if err != nil { + // setKeyResultInfo(oneFailedReactionExtensionList,100,err.Error(),req.ClientMsgID,k,v) + // continue + // } + // redisValue,err:=db.DB.GetMessageTypeKeyValue(req.ClientMsgID,req.SessionType,k) + // if err != nil&&err!=go_redis.Nil { + // setKeyResultInfo(oneFailedReactionExtensionList,200,err.Error(),req.ClientMsgID,k,v) + // continue + // } + // temp:=new(server_api_params.KeyValue) + // utils.JsonStringToStruct(redisValue,temp) + // if v.LatestUpdateTime != temp.LatestUpdateTime { + // setKeyResultInfo(oneFailedReactionExtensionList,300,"message have update",req.ClientMsgID,k,temp) + // continue + // }else{ + // v.LatestUpdateTime = utils.GetCurrentTimestampByMill() + // newerr:=db.DB.SetMessageTypeKeyValue(req.ClientMsgID,req.SessionType,k,utils.StructToJsonString(v)) + // if newerr != nil { + // setKeyResultInfo(oneFailedReactionExtensionList,201,newerr.Error(),req.ClientMsgID,k,temp) + // continue + // } + // setKeyResultInfo(oneSuccessReactionExtensionList,0,"",req.ClientMsgID,k,v) + // } + // + // } + // + // }else{ + // //mongo处理 + // } + // + // }else{ + // + // } + // return + //} return + } func (rpc *rpcChat) AddMessageReactionExtensions(ctx context.Context, req *msg.ModifyMessageReactionExtensionsReq) (resp *msg.ModifyMessageReactionExtensionsResp, err error) { @@ -20,3 +188,16 @@ func (rpc *rpcChat) AddMessageReactionExtensions(ctx context.Context, req *msg.M func (rpc *rpcChat) DeleteMessageReactionExtensions(ctx context.Context, req *msg.OperateMessageListReactionExtensionsReq) (resp *msg.OperateMessageListReactionExtensionsResp, err error) { return } +func lockMessageTypeKey(clientMsgID, typeKey string) (err error) { + for i := 0; i < 3; i++ { + err = db.DB.LockMessageTypeKey(clientMsgID, typeKey) + if err != nil { + time.Sleep(time.Millisecond * 100) + continue + } else { + break + } + } + return err + +} diff --git a/internal/rpc/msg/extend_msg.notification.go b/internal/rpc/msg/extend_msg.notification.go new file mode 100644 index 000000000..34265fa47 --- /dev/null +++ b/internal/rpc/msg/extend_msg.notification.go @@ -0,0 +1,13 @@ +package msg + +import ( + "Open_IM/pkg/common/constant" + pbFriend "Open_IM/pkg/proto/friend" + open_im_sdk "Open_IM/pkg/proto/sdk_ws" +) + +func ExtendMessageUpdatedNotification(operationID, changedUserID string, needNotifiedUserID string, opUserID string) { + selfInfoUpdatedTips := open_im_sdk.UserInfoUpdatedTips{UserID: changedUserID} + commID := pbFriend.CommID{FromUserID: opUserID, ToUserID: needNotifiedUserID, OpUserID: opUserID, OperationID: operationID} + friendNotification(&commID, constant.FriendInfoUpdatedNotification, &selfInfoUpdatedTips) +} diff --git a/pkg/common/db/RedisModel.go b/pkg/common/db/RedisModel.go index a0eab5c30..4e5fd6dab 100644 --- a/pkg/common/db/RedisModel.go +++ b/pkg/common/db/RedisModel.go @@ -38,6 +38,7 @@ const ( groupMinSeq = "GROUP_MIN_SEQ:" sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:" + exTypeKeyLocker = "EX_LOCK:" ) func (d *DataBases) JudgeAccountEXISTS(account string) (bool, error) { @@ -437,3 +438,46 @@ func (d *DataBases) GetUserBadgeUnreadCountSum(uid string) (int, error) { seq, err := d.RDB.Get(context.Background(), key).Result() return utils.StringToInt(seq), err } +func (d *DataBases) JudgeMessageReactionEXISTS(clientMsgID string, sessionType int32) (bool, error) { + key := getMessageReactionExPrefix(clientMsgID, sessionType) + n, err := d.RDB.Exists(context.Background(), key).Result() + if n > 0 { + return true, err + } else { + return false, err + } +} +func (d *DataBases) GetMessageTypeKeyValue(clientMsgID string, sessionType int32, typeKey string) (string, error) { + key := getMessageReactionExPrefix(clientMsgID, sessionType) + result, err := d.RDB.HGet(context.Background(), key, typeKey).Result() + return result, err + +} +func (d *DataBases) SetMessageTypeKeyValue(clientMsgID string, sessionType int32, typeKey, value string) error { + key := getMessageReactionExPrefix(clientMsgID, sessionType) + return d.RDB.HSet(context.Background(), key, typeKey, value).Err() + +} +func (d *DataBases) LockMessageTypeKey(clientMsgID string, TypeKey string) error { + key := exTypeKeyLocker + clientMsgID + "_" + TypeKey + return d.RDB.SetNX(context.Background(), key, 1, time.Minute).Err() +} +func (d *DataBases) UnLockMessageTypeKey(clientMsgID string, TypeKey string) error { + key := exTypeKeyLocker + clientMsgID + "_" + TypeKey + return d.RDB.Del(context.Background(), key).Err() + +} + +func getMessageReactionExPrefix(clientMsgID string, sessionType int32) string { + switch sessionType { + case constant.SingleChatType: + return "EX_SINGLE_" + clientMsgID + case constant.GroupChatType: + return "EX_GROUP_" + clientMsgID + case constant.SuperGroupChatType: + return "EX_SUPER_GROUP_" + clientMsgID + case constant.NotificationChatType: + return "EX_NOTIFICATION" + clientMsgID + } + return "" +}