mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
proto modify
This commit is contained in:
parent
d06235d28e
commit
3a23153e48
@ -129,10 +129,10 @@ func NewGinRouter() *gin.Engine {
|
||||
chatGroup.POST("/check_msg_is_send_success", manage.CheckMsgIsSendSuccess)
|
||||
chatGroup.POST("/set_msg_min_seq", msg.SetMsgMinSeq)
|
||||
|
||||
chatGroup.POST("/set_message_reaction_extensions", msg.SetMessageReactionExtensions)
|
||||
chatGroup.POST("/get_message_list_reaction_extensions", msg.GetMessageListReactionExtensions)
|
||||
chatGroup.POST("/add_message_reaction_extensions", msg.AddMessageReactionExtensions)
|
||||
chatGroup.POST("/delete_message_reaction_extensions", msg.DeleteMessageReactionExtensions)
|
||||
//chatGroup.POST("/set_message_reaction_extensions", msg.SetMessageReactionExtensions)
|
||||
//chatGroup.POST("/get_message_list_reaction_extensions", msg.GetMessageListReactionExtensions)
|
||||
//chatGroup.POST("/add_message_reaction_extensions", msg.AddMessageReactionExtensions)
|
||||
//chatGroup.POST("/delete_message_reaction_extensions", msg.DeleteMessageReactionExtensions)
|
||||
}
|
||||
////Conversation
|
||||
conversationGroup := r.Group("/conversation")
|
||||
|
@ -39,12 +39,12 @@ func (m *MsgCheck) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMinS
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (m *MsgCheck) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqListReq) (*sdkws.PullMessageBySeqListResp, error) {
|
||||
func (m *MsgCheck) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) {
|
||||
cc, err := m.getConn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := msg.NewMsgClient(cc).PullMessageBySeqList(ctx, req)
|
||||
resp, err := msg.NewMsgClient(cc).PullMessageBySeqs(ctx, req)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
|
@ -11,7 +11,7 @@ func (m *msgServer) DelMsgs(ctx context.Context, req *msg.DelMsgsReq) (*msg.DelM
|
||||
if _, err := m.MsgDatabase.DelMsgBySeqs(ctx, req.UserID, req.Seqs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
DeleteMessageNotification(ctx, req.UserID, req.Seqs)
|
||||
//DeleteMessageNotification(ctx, req.UserID, req.Seqs)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
@ -1,216 +1,209 @@
|
||||
package msg
|
||||
|
||||
import (
|
||||
"OpenIM/internal/common/notification"
|
||||
"OpenIM/pkg/common/constant"
|
||||
"OpenIM/pkg/common/log"
|
||||
"OpenIM/pkg/proto/msg"
|
||||
"OpenIM/pkg/proto/sdkws"
|
||||
"OpenIM/pkg/utils"
|
||||
"context"
|
||||
go_redis "github.com/go-redis/redis/v8"
|
||||
|
||||
"time"
|
||||
)
|
||||
|
||||
func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.SetMessageReactionExtensionsReq) (resp *msg.SetMessageReactionExtensionsResp, err error) {
|
||||
resp = &msg.SetMessageReactionExtensionsResp{}
|
||||
//resp.ClientMsgID = req.ClientMsgID
|
||||
//resp.MsgFirstModifyTime = req.MsgFirstModifyTime
|
||||
|
||||
if err := CallbackSetMessageReactionExtensions(ctx, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
//if ExternalExtension
|
||||
if req.IsExternalExtensions {
|
||||
resp.MsgFirstModifyTime = req.MsgFirstModifyTime
|
||||
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, !req.IsReact, false)
|
||||
return resp, nil
|
||||
}
|
||||
isExists, err := m.MsgDatabase.JudgeMessageReactionExist(ctx, req.ClientMsgID, req.SessionType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !isExists {
|
||||
if !req.IsReact {
|
||||
resp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill()
|
||||
for k, v := range req.ReactionExtensions {
|
||||
err := m.MessageLocker.LockMessageTypeKey(ctx, req.ClientMsgID, k)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
||||
if err := m.MsgDatabase.SetMessageTypeKeyValue(ctx, req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
resp.IsReact = true
|
||||
_, err := m.MsgDatabase.SetMessageReactionExpire(ctx, req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
err := m.MessageLocker.LockGlobalMessage(ctx, req.ClientMsgID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mongoValue, err := m.MsgDatabase.GetExtendMsg(ctx, req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
setValue := make(map[string]*sdkws.KeyValue)
|
||||
for k, v := range req.ReactionExtensions {
|
||||
|
||||
temp := new(sdkws.KeyValue)
|
||||
if vv, ok := mongoValue.ReactionExtensions[k]; ok {
|
||||
utils.CopyStructFields(temp, &vv)
|
||||
if v.LatestUpdateTime != vv.LatestUpdateTime {
|
||||
setKeyResultInfo(&resp, 300, "message have update", req.ClientMsgID, k, temp)
|
||||
continue
|
||||
}
|
||||
}
|
||||
temp.TypeKey = k
|
||||
temp.Value = v.Value
|
||||
temp.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
||||
setValue[k] = temp
|
||||
}
|
||||
err = db.DB.InsertOrUpdateReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue)
|
||||
if err != nil {
|
||||
for _, value := range setValue {
|
||||
temp := new(msg.KeyValueResp)
|
||||
temp.KeyValue = value
|
||||
temp.ErrMsg = err.Error()
|
||||
temp.ErrCode = 100
|
||||
resp.Result = append(resp.Result, temp)
|
||||
}
|
||||
} else {
|
||||
for _, value := range setValue {
|
||||
temp := new(msg.KeyValueResp)
|
||||
temp.KeyValue = value
|
||||
resp.Result = append(resp.Result, temp)
|
||||
}
|
||||
}
|
||||
lockErr := m.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID)
|
||||
if lockErr != nil {
|
||||
log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error())
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
log.Debug(req.OperationID, "redis handle secondly", req.String())
|
||||
|
||||
for k, v := range req.ReactionExtensionList {
|
||||
err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k)
|
||||
if err != nil {
|
||||
setKeyResultInfo(&resp, 100, err.Error(), req.ClientMsgID, k, v)
|
||||
continue
|
||||
}
|
||||
redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k)
|
||||
if err != nil && err != go_redis.Nil {
|
||||
setKeyResultInfo(&resp, 200, err.Error(), req.ClientMsgID, k, v)
|
||||
continue
|
||||
}
|
||||
temp := new(sdkws.KeyValue)
|
||||
utils.JsonStringToStruct(redisValue, temp)
|
||||
if v.LatestUpdateTime != temp.LatestUpdateTime {
|
||||
setKeyResultInfo(&resp, 300, "message have update", req.ClientMsgID, k, temp)
|
||||
continue
|
||||
} else {
|
||||
v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
||||
newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v))
|
||||
if newerr != nil {
|
||||
setKeyResultInfo(&resp, 201, newerr.Error(), req.ClientMsgID, k, temp)
|
||||
continue
|
||||
}
|
||||
setKeyResultInfo(&resp, 0, "", req.ClientMsgID, k, v)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
if !isExists {
|
||||
if !req.IsReact {
|
||||
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, true, true)
|
||||
} else {
|
||||
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, false, false)
|
||||
}
|
||||
} else {
|
||||
notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, false, true)
|
||||
}
|
||||
log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", resp.String())
|
||||
return &resp, nil
|
||||
//resp = &msg.SetMessageReactionExtensionsResp{}
|
||||
////resp.ClientMsgID = req.ClientMsgID
|
||||
////resp.MsgFirstModifyTime = req.MsgFirstModifyTime
|
||||
//
|
||||
//if err := CallbackSetMessageReactionExtensions(ctx, req); err != nil {
|
||||
// return nil, err
|
||||
//}
|
||||
////if ExternalExtension
|
||||
//if req.IsExternalExtensions {
|
||||
// resp.MsgFirstModifyTime = req.MsgFirstModifyTime
|
||||
// notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, !req.IsReact, false)
|
||||
// return resp, nil
|
||||
//}
|
||||
//isExists, err := m.MsgDatabase.JudgeMessageReactionExist(ctx, req.ClientMsgID, req.SessionType)
|
||||
//if err != nil {
|
||||
// return nil, err
|
||||
//}
|
||||
//
|
||||
//if !isExists {
|
||||
// if !req.IsReact {
|
||||
// resp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill()
|
||||
// for k, v := range req.ReactionExtensions {
|
||||
// err := m.MessageLocker.LockMessageTypeKey(ctx, req.ClientMsgID, k)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
||||
// if err := m.MsgDatabase.SetMessageTypeKeyValue(ctx, req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// }
|
||||
// resp.IsReact = true
|
||||
// _, err := m.MsgDatabase.SetMessageReactionExpire(ctx, req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// } else {
|
||||
// err := m.MessageLocker.LockGlobalMessage(ctx, req.ClientMsgID)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// mongoValue, err := m.MsgDatabase.GetExtendMsg(ctx, req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// setValue := make(map[string]*sdkws.KeyValue)
|
||||
// for k, v := range req.ReactionExtensions {
|
||||
//
|
||||
// temp := new(sdkws.KeyValue)
|
||||
// if vv, ok := mongoValue.ReactionExtensions[k]; ok {
|
||||
// utils.CopyStructFields(temp, &vv)
|
||||
// if v.LatestUpdateTime != vv.LatestUpdateTime {
|
||||
// setKeyResultInfo(&resp, 300, "message have update", req.ClientMsgID, k, temp)
|
||||
// continue
|
||||
// }
|
||||
// }
|
||||
// temp.TypeKey = k
|
||||
// temp.Value = v.Value
|
||||
// temp.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
||||
// setValue[k] = temp
|
||||
// }
|
||||
// err = db.DB.InsertOrUpdateReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue)
|
||||
// if err != nil {
|
||||
// for _, value := range setValue {
|
||||
// temp := new(msg.KeyValueResp)
|
||||
// temp.KeyValue = value
|
||||
// temp.ErrMsg = err.Error()
|
||||
// temp.ErrCode = 100
|
||||
// resp.Result = append(resp.Result, temp)
|
||||
// }
|
||||
// } else {
|
||||
// for _, value := range setValue {
|
||||
// temp := new(msg.KeyValueResp)
|
||||
// temp.KeyValue = value
|
||||
// resp.Result = append(resp.Result, temp)
|
||||
// }
|
||||
// }
|
||||
// lockErr := m.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID)
|
||||
// if lockErr != nil {
|
||||
// log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error())
|
||||
// }
|
||||
// }
|
||||
//
|
||||
//} else {
|
||||
// log.Debug(req.OperationID, "redis handle secondly", req.String())
|
||||
//
|
||||
// for k, v := range req.ReactionExtensionList {
|
||||
// err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k)
|
||||
// if err != nil {
|
||||
// setKeyResultInfo(&resp, 100, err.Error(), req.ClientMsgID, k, v)
|
||||
// continue
|
||||
// }
|
||||
// redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k)
|
||||
// if err != nil && err != go_redis.Nil {
|
||||
// setKeyResultInfo(&resp, 200, err.Error(), req.ClientMsgID, k, v)
|
||||
// continue
|
||||
// }
|
||||
// temp := new(sdkws.KeyValue)
|
||||
// utils.JsonStringToStruct(redisValue, temp)
|
||||
// if v.LatestUpdateTime != temp.LatestUpdateTime {
|
||||
// setKeyResultInfo(&resp, 300, "message have update", req.ClientMsgID, k, temp)
|
||||
// continue
|
||||
// } else {
|
||||
// v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
||||
// newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v))
|
||||
// if newerr != nil {
|
||||
// setKeyResultInfo(&resp, 201, newerr.Error(), req.ClientMsgID, k, temp)
|
||||
// continue
|
||||
// }
|
||||
// setKeyResultInfo(&resp, 0, "", req.ClientMsgID, k, v)
|
||||
// }
|
||||
//
|
||||
// }
|
||||
//}
|
||||
//if !isExists {
|
||||
// if !req.IsReact {
|
||||
// notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, true, true)
|
||||
// } else {
|
||||
// notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, false, false)
|
||||
// }
|
||||
//} else {
|
||||
// notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &resp, false, true)
|
||||
//}
|
||||
//log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", resp.String())
|
||||
return resp, nil
|
||||
|
||||
}
|
||||
func setKeyResultInfo(r *msg.SetMessageReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *sdkws.KeyValue) {
|
||||
func (m *msgServer) setKeyResultInfo(ctx context.Context, r *msg.SetMessageReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *sdkws.KeyValue) {
|
||||
temp := new(msg.KeyValueResp)
|
||||
temp.KeyValue = keyValue
|
||||
temp.ErrCode = errCode
|
||||
temp.ErrMsg = errMsg
|
||||
r.Result = append(r.Result, temp)
|
||||
_ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
|
||||
_ = m.MessageLocker.UnLockMessageTypeKey(ctx, clientMsgID, typeKey)
|
||||
}
|
||||
func setDeleteKeyResultInfo(r *msg.DeleteMessageListReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *sdkws.KeyValue) {
|
||||
func (m *msgServer) setDeleteKeyResultInfo(ctx context.Context, r *msg.DeleteMessagesReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *sdkws.KeyValue) {
|
||||
temp := new(msg.KeyValueResp)
|
||||
temp.KeyValue = keyValue
|
||||
temp.ErrCode = errCode
|
||||
temp.ErrMsg = errMsg
|
||||
r.Result = append(r.Result, temp)
|
||||
_ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
|
||||
_ = m.MessageLocker.UnLockMessageTypeKey(ctx, clientMsgID, typeKey)
|
||||
}
|
||||
|
||||
func (m *msgServer) GetMessagesReactionExtensions(ctx context.Context, req *msg.GetMessagesReactionExtensionsReq) (resp *msg.GetMessagesReactionExtensionsResp, err error) {
|
||||
log.Debug(req.OperationID, utils.GetSelfFuncName(), "m args is:", req.String())
|
||||
var rResp msg.GetMessageListReactionExtensionsResp
|
||||
for _, messageValue := range req.MessageReactionKeyList {
|
||||
var oneMessage msg.SingleMessageExtensionResult
|
||||
oneMessage.ClientMsgID = messageValue.ClientMsgID
|
||||
|
||||
isExists, err := db.DB.JudgeMessageReactionExist(messageValue.ClientMsgID, req.SessionType)
|
||||
if err != nil {
|
||||
rResp.ErrCode = 100
|
||||
rResp.ErrMsg = err.Error()
|
||||
return &rResp, nil
|
||||
}
|
||||
if isExists {
|
||||
redisValue, err := db.DB.GetOneMessageAllReactionList(messageValue.ClientMsgID, req.SessionType)
|
||||
if err != nil {
|
||||
oneMessage.ErrCode = 100
|
||||
oneMessage.ErrMsg = err.Error()
|
||||
rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
|
||||
continue
|
||||
}
|
||||
keyMap := make(map[string]*sdkws.KeyValue)
|
||||
|
||||
for k, v := range redisValue {
|
||||
temp := new(sdkws.KeyValue)
|
||||
utils.JsonStringToStruct(v, temp)
|
||||
keyMap[k] = temp
|
||||
}
|
||||
oneMessage.ReactionExtensionList = keyMap
|
||||
|
||||
} else {
|
||||
mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, messageValue.ClientMsgID, messageValue.MsgFirstModifyTime)
|
||||
if err != nil {
|
||||
oneMessage.ErrCode = 100
|
||||
oneMessage.ErrMsg = err.Error()
|
||||
rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
|
||||
continue
|
||||
}
|
||||
keyMap := make(map[string]*sdkws.KeyValue)
|
||||
|
||||
for k, v := range mongoValue.ReactionExtensionList {
|
||||
temp := new(sdkws.KeyValue)
|
||||
temp.TypeKey = v.TypeKey
|
||||
temp.Value = v.Value
|
||||
temp.LatestUpdateTime = v.LatestUpdateTime
|
||||
keyMap[k] = temp
|
||||
}
|
||||
oneMessage.ReactionExtensionList = keyMap
|
||||
}
|
||||
rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
|
||||
}
|
||||
log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", rResp.String())
|
||||
return &rResp, nil
|
||||
//log.Debug(req.OperationID, utils.GetSelfFuncName(), "m args is:", req.String())
|
||||
//var rResp msg.GetMessageListReactionExtensionsResp
|
||||
//for _, messageValue := range req.MessageReactionKeyList {
|
||||
// var oneMessage msg.SingleMessageExtensionResult
|
||||
// oneMessage.ClientMsgID = messageValue.ClientMsgID
|
||||
//
|
||||
// isExists, err := db.DB.JudgeMessageReactionExist(messageValue.ClientMsgID, req.SessionType)
|
||||
// if err != nil {
|
||||
// rResp.ErrCode = 100
|
||||
// rResp.ErrMsg = err.Error()
|
||||
// return &rResp, nil
|
||||
// }
|
||||
// if isExists {
|
||||
// redisValue, err := db.DB.GetOneMessageAllReactionList(messageValue.ClientMsgID, req.SessionType)
|
||||
// if err != nil {
|
||||
// oneMessage.ErrCode = 100
|
||||
// oneMessage.ErrMsg = err.Error()
|
||||
// rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
|
||||
// continue
|
||||
// }
|
||||
// keyMap := make(map[string]*sdkws.KeyValue)
|
||||
//
|
||||
// for k, v := range redisValue {
|
||||
// temp := new(sdkws.KeyValue)
|
||||
// utils.JsonStringToStruct(v, temp)
|
||||
// keyMap[k] = temp
|
||||
// }
|
||||
// oneMessage.ReactionExtensionList = keyMap
|
||||
//
|
||||
// } else {
|
||||
// mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, messageValue.ClientMsgID, messageValue.MsgFirstModifyTime)
|
||||
// if err != nil {
|
||||
// oneMessage.ErrCode = 100
|
||||
// oneMessage.ErrMsg = err.Error()
|
||||
// rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
|
||||
// continue
|
||||
// }
|
||||
// keyMap := make(map[string]*sdkws.KeyValue)
|
||||
//
|
||||
// for k, v := range mongoValue.ReactionExtensionList {
|
||||
// temp := new(sdkws.KeyValue)
|
||||
// temp.TypeKey = v.TypeKey
|
||||
// temp.Value = v.Value
|
||||
// temp.LatestUpdateTime = v.LatestUpdateTime
|
||||
// keyMap[k] = temp
|
||||
// }
|
||||
// oneMessage.ReactionExtensionList = keyMap
|
||||
// }
|
||||
// rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
|
||||
//}
|
||||
//log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", rResp.String())
|
||||
return resp, nil
|
||||
|
||||
}
|
||||
|
||||
@ -219,149 +212,149 @@ func (m *msgServer) AddMessageReactionExtensions(ctx context.Context, req *msg.M
|
||||
}
|
||||
|
||||
func (m *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *msg.DeleteMessagesReactionExtensionsReq) (resp *msg.DeleteMessagesReactionExtensionsResp, err error) {
|
||||
log.Debug(req.OperationID, utils.GetSelfFuncName(), "m args is:", req.String())
|
||||
var rResp msg.DeleteMessagesReactionExtensionsResp
|
||||
callbackResp := notification.callbackDeleteMessageReactionExtensions(req)
|
||||
if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 {
|
||||
rResp.ErrCode = int32(callbackResp.ErrCode)
|
||||
rResp.ErrMsg = callbackResp.ErrMsg
|
||||
for _, value := range req.ReactionExtensionList {
|
||||
temp := new(msg.KeyValueResp)
|
||||
temp.KeyValue = value
|
||||
temp.ErrMsg = callbackResp.ErrMsg
|
||||
temp.ErrCode = 100
|
||||
rResp.Result = append(rResp.Result, temp)
|
||||
}
|
||||
return &rResp, nil
|
||||
}
|
||||
//if ExternalExtension
|
||||
if req.IsExternalExtensions {
|
||||
rResp.Result = callbackResp.ResultReactionExtensionList
|
||||
notification.ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, false)
|
||||
return &rResp, nil
|
||||
|
||||
}
|
||||
for _, v := range callbackResp.ResultReactionExtensions {
|
||||
if v.ErrCode != 0 {
|
||||
func(req *[]*sdkws.KeyValue, typeKey string) {
|
||||
for i := 0; i < len(*req); i++ {
|
||||
if (*req)[i].TypeKey == typeKey {
|
||||
*req = append((*req)[:i], (*req)[i+1:]...)
|
||||
}
|
||||
}
|
||||
}(&req.ReactionExtensionList, v.KeyValue.TypeKey)
|
||||
rResp.Result = append(rResp.Result, v)
|
||||
}
|
||||
}
|
||||
isExists, err := db.DB.JudgeMessageReactionExist(req.ClientMsgID, req.SessionType)
|
||||
if err != nil {
|
||||
rResp.ErrCode = 100
|
||||
rResp.ErrMsg = err.Error()
|
||||
for _, value := range req.ReactionExtensionList {
|
||||
temp := new(msg.KeyValueResp)
|
||||
temp.KeyValue = value
|
||||
temp.ErrMsg = err.Error()
|
||||
temp.ErrCode = 100
|
||||
rResp.Result = append(rResp.Result, temp)
|
||||
}
|
||||
return &rResp, nil
|
||||
}
|
||||
|
||||
if isExists {
|
||||
log.Debug(req.OperationID, "redis handle this delete", req.String())
|
||||
for _, v := range req.ReactionExtensionList {
|
||||
err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, v.TypeKey)
|
||||
if err != nil {
|
||||
setDeleteKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, v.TypeKey, v)
|
||||
continue
|
||||
}
|
||||
|
||||
redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, v.TypeKey)
|
||||
if err != nil && err != go_redis.Nil {
|
||||
setDeleteKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, v.TypeKey, v)
|
||||
continue
|
||||
}
|
||||
temp := new(sdkws.KeyValue)
|
||||
utils.JsonStringToStruct(redisValue, temp)
|
||||
if v.LatestUpdateTime != temp.LatestUpdateTime {
|
||||
setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp)
|
||||
continue
|
||||
} else {
|
||||
newErr := db.DB.DeleteOneMessageKey(req.ClientMsgID, req.SessionType, v.TypeKey)
|
||||
if newErr != nil {
|
||||
setDeleteKeyResultInfo(&rResp, 201, newErr.Error(), req.ClientMsgID, v.TypeKey, temp)
|
||||
continue
|
||||
}
|
||||
setDeleteKeyResultInfo(&rResp, 0, "", req.ClientMsgID, v.TypeKey, v)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
err := m.dMessageLocker.LockGlobalMessage(req.ClientMsgID)
|
||||
if err != nil {
|
||||
rResp.ErrCode = 100
|
||||
rResp.ErrMsg = err.Error()
|
||||
for _, value := range req.ReactionExtensionList {
|
||||
temp := new(msg.KeyValueResp)
|
||||
temp.KeyValue = value
|
||||
temp.ErrMsg = err.Error()
|
||||
temp.ErrCode = 100
|
||||
rResp.Result = append(rResp.Result, temp)
|
||||
}
|
||||
return &rResp, nil
|
||||
}
|
||||
mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime)
|
||||
if err != nil {
|
||||
rResp.ErrCode = 200
|
||||
rResp.ErrMsg = err.Error()
|
||||
for _, value := range req.ReactionExtensionList {
|
||||
temp := new(msg.KeyValueResp)
|
||||
temp.KeyValue = value
|
||||
temp.ErrMsg = err.Error()
|
||||
temp.ErrCode = 100
|
||||
rResp.Result = append(rResp.Result, temp)
|
||||
}
|
||||
return &rResp, nil
|
||||
}
|
||||
setValue := make(map[string]*sdkws.KeyValue)
|
||||
for _, v := range req.ReactionExtensionList {
|
||||
|
||||
temp := new(sdkws.KeyValue)
|
||||
if vv, ok := mongoValue.ReactionExtensionList[v.TypeKey]; ok {
|
||||
utils.CopyStructFields(temp, &vv)
|
||||
if v.LatestUpdateTime != vv.LatestUpdateTime {
|
||||
setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp)
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
setDeleteKeyResultInfo(&rResp, 400, "key not in", req.ClientMsgID, v.TypeKey, v)
|
||||
continue
|
||||
}
|
||||
temp.TypeKey = v.TypeKey
|
||||
setValue[v.TypeKey] = temp
|
||||
}
|
||||
err = db.DB.DeleteReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue)
|
||||
if err != nil {
|
||||
for _, value := range setValue {
|
||||
temp := new(msg.KeyValueResp)
|
||||
temp.KeyValue = value
|
||||
temp.ErrMsg = err.Error()
|
||||
temp.ErrCode = 100
|
||||
rResp.Result = append(rResp.Result, temp)
|
||||
}
|
||||
} else {
|
||||
for _, value := range setValue {
|
||||
temp := new(msg.KeyValueResp)
|
||||
temp.KeyValue = value
|
||||
rResp.Result = append(rResp.Result, temp)
|
||||
}
|
||||
}
|
||||
lockErr := m.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID)
|
||||
if lockErr != nil {
|
||||
log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error())
|
||||
}
|
||||
|
||||
}
|
||||
notification.ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, isExists)
|
||||
log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", rResp.String())
|
||||
return &rResp, nil
|
||||
//log.Debug(req.OperationID, utils.GetSelfFuncName(), "m args is:", req.String())
|
||||
//var rResp msg.DeleteMessagesReactionExtensionsResp
|
||||
//callbackResp := notification.callbackDeleteMessageReactionExtensions(req)
|
||||
//if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 {
|
||||
// rResp.ErrCode = int32(callbackResp.ErrCode)
|
||||
// rResp.ErrMsg = callbackResp.ErrMsg
|
||||
// for _, value := range req.ReactionExtensionList {
|
||||
// temp := new(msg.KeyValueResp)
|
||||
// temp.KeyValue = value
|
||||
// temp.ErrMsg = callbackResp.ErrMsg
|
||||
// temp.ErrCode = 100
|
||||
// rResp.Result = append(rResp.Result, temp)
|
||||
// }
|
||||
// return &rResp, nil
|
||||
//}
|
||||
////if ExternalExtension
|
||||
//if req.IsExternalExtensions {
|
||||
// rResp.Result = callbackResp.ResultReactionExtensionList
|
||||
// notification.ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, false)
|
||||
// return &rResp, nil
|
||||
//
|
||||
//}
|
||||
//for _, v := range callbackResp.ResultReactionExtensions {
|
||||
// if v.ErrCode != 0 {
|
||||
// func(req *[]*sdkws.KeyValue, typeKey string) {
|
||||
// for i := 0; i < len(*req); i++ {
|
||||
// if (*req)[i].TypeKey == typeKey {
|
||||
// *req = append((*req)[:i], (*req)[i+1:]...)
|
||||
// }
|
||||
// }
|
||||
// }(&req.ReactionExtensionList, v.KeyValue.TypeKey)
|
||||
// rResp.Result = append(rResp.Result, v)
|
||||
// }
|
||||
//}
|
||||
//isExists, err := db.DB.JudgeMessageReactionExist(req.ClientMsgID, req.SessionType)
|
||||
//if err != nil {
|
||||
// rResp.ErrCode = 100
|
||||
// rResp.ErrMsg = err.Error()
|
||||
// for _, value := range req.ReactionExtensionList {
|
||||
// temp := new(msg.KeyValueResp)
|
||||
// temp.KeyValue = value
|
||||
// temp.ErrMsg = err.Error()
|
||||
// temp.ErrCode = 100
|
||||
// rResp.Result = append(rResp.Result, temp)
|
||||
// }
|
||||
// return &rResp, nil
|
||||
//}
|
||||
//
|
||||
//if isExists {
|
||||
// log.Debug(req.OperationID, "redis handle this delete", req.String())
|
||||
// for _, v := range req.ReactionExtensionList {
|
||||
// err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, v.TypeKey)
|
||||
// if err != nil {
|
||||
// setDeleteKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, v.TypeKey, v)
|
||||
// continue
|
||||
// }
|
||||
//
|
||||
// redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, v.TypeKey)
|
||||
// if err != nil && err != go_redis.Nil {
|
||||
// setDeleteKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, v.TypeKey, v)
|
||||
// continue
|
||||
// }
|
||||
// temp := new(sdkws.KeyValue)
|
||||
// utils.JsonStringToStruct(redisValue, temp)
|
||||
// if v.LatestUpdateTime != temp.LatestUpdateTime {
|
||||
// setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp)
|
||||
// continue
|
||||
// } else {
|
||||
// newErr := db.DB.DeleteOneMessageKey(req.ClientMsgID, req.SessionType, v.TypeKey)
|
||||
// if newErr != nil {
|
||||
// setDeleteKeyResultInfo(&rResp, 201, newErr.Error(), req.ClientMsgID, v.TypeKey, temp)
|
||||
// continue
|
||||
// }
|
||||
// setDeleteKeyResultInfo(&rResp, 0, "", req.ClientMsgID, v.TypeKey, v)
|
||||
// }
|
||||
// }
|
||||
//} else {
|
||||
// err := m.dMessageLocker.LockGlobalMessage(req.ClientMsgID)
|
||||
// if err != nil {
|
||||
// rResp.ErrCode = 100
|
||||
// rResp.ErrMsg = err.Error()
|
||||
// for _, value := range req.ReactionExtensionList {
|
||||
// temp := new(msg.KeyValueResp)
|
||||
// temp.KeyValue = value
|
||||
// temp.ErrMsg = err.Error()
|
||||
// temp.ErrCode = 100
|
||||
// rResp.Result = append(rResp.Result, temp)
|
||||
// }
|
||||
// return &rResp, nil
|
||||
// }
|
||||
// mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime)
|
||||
// if err != nil {
|
||||
// rResp.ErrCode = 200
|
||||
// rResp.ErrMsg = err.Error()
|
||||
// for _, value := range req.ReactionExtensionList {
|
||||
// temp := new(msg.KeyValueResp)
|
||||
// temp.KeyValue = value
|
||||
// temp.ErrMsg = err.Error()
|
||||
// temp.ErrCode = 100
|
||||
// rResp.Result = append(rResp.Result, temp)
|
||||
// }
|
||||
// return &rResp, nil
|
||||
// }
|
||||
// setValue := make(map[string]*sdkws.KeyValue)
|
||||
// for _, v := range req.ReactionExtensionList {
|
||||
//
|
||||
// temp := new(sdkws.KeyValue)
|
||||
// if vv, ok := mongoValue.ReactionExtensionList[v.TypeKey]; ok {
|
||||
// utils.CopyStructFields(temp, &vv)
|
||||
// if v.LatestUpdateTime != vv.LatestUpdateTime {
|
||||
// setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp)
|
||||
// continue
|
||||
// }
|
||||
// } else {
|
||||
// setDeleteKeyResultInfo(&rResp, 400, "key not in", req.ClientMsgID, v.TypeKey, v)
|
||||
// continue
|
||||
// }
|
||||
// temp.TypeKey = v.TypeKey
|
||||
// setValue[v.TypeKey] = temp
|
||||
// }
|
||||
// err = db.DB.DeleteReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue)
|
||||
// if err != nil {
|
||||
// for _, value := range setValue {
|
||||
// temp := new(msg.KeyValueResp)
|
||||
// temp.KeyValue = value
|
||||
// temp.ErrMsg = err.Error()
|
||||
// temp.ErrCode = 100
|
||||
// rResp.Result = append(rResp.Result, temp)
|
||||
// }
|
||||
// } else {
|
||||
// for _, value := range setValue {
|
||||
// temp := new(msg.KeyValueResp)
|
||||
// temp.KeyValue = value
|
||||
// rResp.Result = append(rResp.Result, temp)
|
||||
// }
|
||||
// }
|
||||
// lockErr := m.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID)
|
||||
// if lockErr != nil {
|
||||
// log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error())
|
||||
// }
|
||||
//
|
||||
//}
|
||||
//notification.ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, isExists)
|
||||
//log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", rResp.String())
|
||||
return resp, nil
|
||||
}
|
||||
|
2
pkg/common/db/cache/group.go
vendored
2
pkg/common/db/cache/group.go
vendored
@ -52,7 +52,7 @@ type GroupCacheRedis struct {
|
||||
rcClient *rockscache.Client
|
||||
}
|
||||
|
||||
func NewGroupCacheRedis(rdb redis.UniversalClient, groupDB relationTb.GroupModelInterface, groupMemberDB relationTb.GroupMemberModelInterface, groupRequestDB relationTb.GroupRequestModelInterface, mongoClient unrelation2.SuperGroupModelInterface, opts rockscache.Options) GroupCacheRedisInterface {
|
||||
func NewGroupCacheRedis(rdb redis.UniversalClient, groupDB relationTb.GroupModelInterface, groupMemberDB relationTb.GroupMemberModelInterface, groupRequestDB relationTb.GroupRequestModelInterface, mongoClient unrelation2.SuperGroupModelInterface, opts rockscache.Options) GroupCache {
|
||||
return &GroupCacheRedis{rcClient: rockscache.NewClient(rdb, opts), expireTime: groupExpireTime,
|
||||
group: groupDB, groupMember: groupMemberDB, groupRequest: groupRequestDB,
|
||||
mongoDB: mongoClient,
|
||||
|
2
pkg/common/db/cache/redis.go
vendored
2
pkg/common/db/cache/redis.go
vendored
@ -218,7 +218,7 @@ func (c *cache) SetMessageToCache(ctx context.Context, userID string, msgList []
|
||||
}
|
||||
}
|
||||
if len(failedMsgs) != 0 {
|
||||
return len(failedMsgs), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList, tracelog.GetOperationID(ctx)))
|
||||
return len(failedMsgs), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedMsgs, tracelog.GetOperationID(ctx)))
|
||||
}
|
||||
_, err := pipe.Exec(ctx)
|
||||
return 0, err
|
||||
|
@ -134,6 +134,14 @@ message getPaginationFriendsApplyFromResp{
|
||||
int32 total = 2;
|
||||
}
|
||||
|
||||
message getFriendIDsReq {
|
||||
string userID = 1;
|
||||
}
|
||||
|
||||
message getFriendIDsResp {
|
||||
repeated string = 1;
|
||||
}
|
||||
|
||||
service friend{
|
||||
//申请加好友
|
||||
rpc applyToAddFriend(applyToAddFriendReq) returns(applyToAddFriendResp);
|
||||
@ -163,9 +171,6 @@ service friend{
|
||||
rpc getDesignatedFriends(getDesignatedFriendsReq) returns(getDesignatedFriendsResp);
|
||||
//获取指定好友信息 有id不存在也返回错误
|
||||
rpc getPaginationFriends(getPaginationFriendsReq) returns (getPaginationFriendsResp);
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
// 获取好友ID列表
|
||||
rpc getFriendIDs(getFriendIDsReq) returns (getFriendIDsResp);
|
||||
}
|
@ -283,11 +283,11 @@ message GetUserInGroupMembersResp{
|
||||
repeated sdkws.GroupMemberFullInfo members = 1;
|
||||
}
|
||||
|
||||
message GetGroupMemberUserIDReq{
|
||||
message GetGroupMemberUserIDsReq{
|
||||
string groupID = 1;
|
||||
}
|
||||
|
||||
message GetGroupMemberUserIDResp{
|
||||
message GetGroupMemberUserIDsResp{
|
||||
repeated string userIDs = 1;
|
||||
}
|
||||
|
||||
@ -356,7 +356,7 @@ service group{
|
||||
//获取某个用户在指定群中的信息
|
||||
rpc getUserInGroupMembers(GetUserInGroupMembersReq) returns (GetUserInGroupMembersResp);
|
||||
//获取群成员用户ID
|
||||
rpc getGroupMemberUserID(GetGroupMemberUserIDReq) returns (GetGroupMemberUserIDResp);
|
||||
rpc getGroupMemberUserIDs(GetGroupMemberUserIDsReq) returns (GetGroupMemberUserIDsResp);
|
||||
//查询群组中对应级别的成员
|
||||
rpc GetGroupMemberRoleLevel(GetGroupMemberRoleLevelReq)returns (GetGroupMemberRoleLevelResp);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user